赞
踩
前面说了golang的channel, 今天我们看看golang select 是怎么实现的。
- type scase struct {
- c *hchan // chan
- elem unsafe.Pointer // 数据
- }
select 非默认的case 中都是处理channel 的 接受和发送,所有scase 结构体中c是用来存储select 的case中使用的channel
编译器在中间代码生成期间会根据 select 中 case 的不同对控制语句进行优化,这一过程都发生在cmd/compile/internal/walk/select.go 中,下面会根据不同的场景进行分析代码。
- func main() {
- select {}
- }
如果是空的select语句,程序会被阻塞,golang 带有死锁监测机制:如果当前写成无法被唤醒,则会panic
在runtime/select.go中可以看到:如果cases为空直接调用gopark函数以waitReasonSelectNoCases的原因挂起当前的协程,并且无法被唤醒,golang监测到直接panic。
同样我们在walk/select.go的walkSelectCases函数中可以看到,如果case为空直接调用runtime.block函数
- func main() {
- ch := make(chan int)
- go func() {
- ch <- 1
- }()
- select {
- case data := <-ch:
- fmt.Println("ch data:", data)
- }
- }
如果有输入直接打印ch data : 1 , 没有的话会被检测出all goroutines are asleep - deadlock!(和没有case的一样)
如果一个非default case ,将读写转换成 ch <- 或 <- ch, 正常的channel读写
- func walkSelectCases(cases []*ir.CommClause) []ir.Node {
- // optimization: one-case select: single op.
- if ncas == 1 {
- cas := cases[0] //获取case
- ir.SetPos(cas)
- l := cas.Init()
- if cas.Comm != nil { // 不是默认
- n := cas.Comm // 获取case的条件语句
- l = append(l, ir.TakeInit(n)...)
- switch n.Op() {
- default:
- base.Fatalf("select %v", n.Op())
-
- case ir.OSEND: // 如果是 send, 无须处理
- // already ok
-
- case ir.OSELRECV2:
- r := n.(*ir.AssignListStmt)
- // 如果不是 data, ok := <- ch 类型,处理成<- ch
- if ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {
- n = r.Rhs[0]
- break
- }
- // 是的话, op设置成data, ok := <- ch形式
- r.SetOp(ir.OAS2RECV)
- }
-
- l = append(l, n)
- }
- // 将case 条件后要执行的语句加入带执行的列表
- l = append(l, cas.Body...)
- // 加入 break类型,跳出select-case
- l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
- return l
- }
- // convert case value arguments to addresses.
- // this rewrite is used by both the general code and the next optimization.
- var dflt *ir.CommClause
- for _, cas := range cases {
- ir.SetPos(cas)
- n := cas.Comm
- if n == nil {
- dflt = cas
- continue
- }
- switch n.Op() {
- case ir.OSEND:
- n := n.(*ir.SendStmt)
- n.Value = typecheck.NodAddr(n.Value)
- n.Value = typecheck.Expr(n.Value)
-
- case ir.OSELRECV2:
- n := n.(*ir.AssignListStmt)
- if !ir.IsBlank(n.Lhs[0]) {
- n.Lhs[0] = typecheck.NodAddr(n.Lhs[0])
- n.Lhs[0] = typecheck.Expr(n.Lhs[0])
- }
- }
- }
- }
- func main() {
- ch := make(chan int)
- select {
- case data := <-ch:
- fmt.Println("ch data:", data)
- default:
- fmt.Println("default")
- }
- }
如果写入就走<- 读取,反之走默认
如果是两个case,其中一个是default,非default的会根据send还是recv 调用channel的selectnbsend和 selectnbrecv。这两个方法是非阻塞的
- func walkSelectCases(cases []*ir.CommClause) []ir.Node){
- // optimization: two-case select but one is default: single non-blocking op.
- if ncas == 2 && dflt != nil {
- cas := cases[0]
- if cas == dflt { // 如果是default 放在 cases[1]
- cas = cases[1]
- }
-
- n := cas.Comm
- ir.SetPos(n)
- r := ir.NewIfStmt(base.Pos, nil, nil, nil)
- r.SetInit(cas.Init())
- var cond ir.Node
- switch n.Op() {
- default:
- base.Fatalf("select %v", n.Op())
-
- case ir.OSEND:
- // 调用selectnbsend(c, v)
- // if selectnbsend(c, v) { body } else { default body }
- n := n.(*ir.SendStmt)
- ch := n.Chan
- cond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)
-
- case ir.OSELRECV2:
- n := n.(*ir.AssignListStmt)
- recv := n.Rhs[0].(*ir.UnaryExpr)
- ch := recv.X
- elem := n.Lhs[0]
- if ir.IsBlank(elem) { //空的话 elem= NodNil
- elem = typecheck.NodNil()
- }
- cond = typecheck.Temp(types.Types[types.TBOOL])
- // 调用 selectnbrecv
- fn := chanfn("selectnbrecv", 2, ch.Type())
- call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)
- as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})
- r.PtrInit().Append(typecheck.Stmt(as))
- }
-
- r.Cond = typecheck.Expr(cond)
- r.Body = cas.Body
- r.Else = append(dflt.Init(), dflt.Body...)
- return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}
- }
- }
每次尝试从channel读/写值,如果不成功则直接返回,不会阻塞。从selectnbsend和selectnbrecv看出,最后转换成if-else
- // compiler implements
- //
- // select {
- // case c <- v:
- // ... foo
- // default:
- // ... bar
- // }
- //
- // as
- //
- // if selectnbsend(c, v) {
- // ... foo
- // } else {
- // ... bar
- // }
- //
- func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
- // block:false
- // chan将 select准换if-else
- return chansend(c, elem, false, getcallerpc())
- }
-
- // compiler implements
- //
- // select {
- // case v, ok = <-c:
- // ... foo
- // default:
- // ... bar
- // }
- //
- // as
- //
- // if selected, ok = selectnbrecv(&v, c); selected {
- // ... foo
- // } else {
- // ... bar
- // }
- //
- func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
- // block:false
- // chan将 select准换if-else
- return chanrecv(c, elem, false)
- }
- func main() {
- ch := make(chan int)
- go func() {
- tempArr := []int{1,2,3,4,5,6}
- for i := range tempArr {
- ch <- i
- }
- }()
- go func() {
- for {
- select {
- case i := <-ch:
- println("first: ", i)
- case i := <-ch:
- println("second", i)
- }
- }
- }()
- time.Sleep(3 * time.Second)
-
- }
可以看到多个case,会随机选取一个case执行
- func walkSelectCases(cases []*ir.CommClause) []ir.Node {
- ncas := len(cases)
- sellineno := base.Pos
- if dflt != nil {
- ncas--
- }
- // 定义casorder为ncas大小的case语句的数组
- casorder := make([]*ir.CommClause, ncas)
- // 分别定义nsends为发送channel的case个数,nrecvs为接收channel的case个数
- nsends, nrecvs := 0, 0
- // 多case编译后待执行的语句列表
- var init []ir.Node
-
- // generate sel-struct
- base.Pos = sellineno
- // 定义selv为长度为ncas的scase类型的数组
- // scasetype()函数返回的就是scase结构体,包含c和elem两个字段
- selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))
- init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))
-
- // No initialization for order; runtime.selectgo is responsible for that.
- // 定义order为2倍的ncas长度的TUINT16类型的数组
- // 注意:selv和order作为runtime.selectgo()函数的入参,前者存放scase列表内存地址,后者用来做scase排序使用,排序是为了便于挑选出待执行的case
- order := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))
-
- var pc0, pcs ir.Node
- if base.Flag.Race {
- pcs = typecheck.Temp(types.NewArray(types.Types[types.TUINTPTR], int64(ncas)))
- pc0 = typecheck.Expr(typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(0))))
- } else {
- pc0 = typecheck.NodNil()
- }
-
- // register cases 遍历case生成scase对象放到selv中
- for _, cas := range cases {
- ir.SetPos(cas)
-
- init = append(init, ir.TakeInit(cas)...)
-
- n := cas.Comm
- if n == nil { // default:
- continue
- }
-
- var i int
- var c, elem ir.Node
- switch n.Op() { // 根据类型获取chan, elem的值
- default:
- base.Fatalf("select %v", n.Op())
- case ir.OSEND: // 发送chan类型,i从0开始递增
- n := n.(*ir.SendStmt)
- i = nsends
- nsends++
- c = n.Chan
- elem = n.Value
- case ir.OSELRECV2: // 接收chan,i从ncas开始递减
- n := n.(*ir.AssignListStmt)
- nrecvs++
- i = ncas - nrecvs
- recv := n.Rhs[0].(*ir.UnaryExpr)
- c = recv.X
- elem = n.Lhs[0]
- }
-
- casorder[i] = cas
- // 定义一个函数,写入c或elem到selv数组
- setField := func(f string, val ir.Node) {
- // 放到selv数组
- r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)
- // 添加到带执行列表
- init = append(init, typecheck.Stmt(r))
- }
-
- c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])
- setField("c", c)
- if !ir.IsBlank(elem) {
- elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])
- setField("elem", elem)
- }
-
- // TODO(mdempsky): There should be a cleaner way to
- // handle this.
- if base.Flag.Race {
- r := mkcallstmt("selectsetpc", typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(int64(i)))))
- init = append(init, r)
- }
- }
- // 如果发送chan和接收chan的个数不等于ncas,直接报错
- if nsends+nrecvs != ncas {
- base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)
- }
-
- // run the select 开始执行select动作
- base.Pos = sellineno
- // 定义chosen, recvOK作为selectgo()函数的两个返回值
- // chosen 表示被选中的case的索引,recvOK表示对于接收操作,是否成功接收
- chosen := typecheck.Temp(types.Types[types.TINT])
- recvOK := typecheck.Temp(types.Types[types.TBOOL])
- r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)
- r.Lhs = []ir.Node{chosen, recvOK}
- // 调用runtime.selectgo()函数作为运行时实际执行多case的select动作的函数
- fn := typecheck.LookupRuntime("selectgo")
- var fnInit ir.Nodes
- r.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}
- init = append(init, fnInit...)
- init = append(init, typecheck.Stmt(r))
-
- // selv and order are no longer alive after selectgo.
- // 执行完selectgo()函数后,销毁selv和order数组.
- init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))
- init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))
- if base.Flag.Race {
- init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, pcs))
- }
-
- // dispatch cases
- //定义一个函数,根据chosen确定的case分支生成if语句,执行该分支的语句
- dispatch := func(cond ir.Node, cas *ir.CommClause) {
- cond = typecheck.Expr(cond)
- cond = typecheck.DefaultLit(cond, nil)
-
- r := ir.NewIfStmt(base.Pos, cond, nil, nil)
-
- if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {
- n := n.(*ir.AssignListStmt)
- if !ir.IsBlank(n.Lhs[1]) {
- x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)
- r.Body.Append(typecheck.Stmt(x))
- }
- }
-
- r.Body.Append(cas.Body.Take()...)
- r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
- init = append(init, r)
- }
- // 如果多case中有default分支,并且chosen小于0,执行该default分支
- if dflt != nil {
- ir.SetPos(dflt)
- dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)
- }
- // 如果有chosen选中的case分支,即chosen等于i,则执行该分支
- for i, cas := range casorder {
- ir.SetPos(cas)
- dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)
- }
-
- return init
- }
从上面代码可以看出:
1- 初始化过程: 生成scase数组,定义selv 存放scase数组内存地址,定义order 来给scase排序
2- 遍历所有的case ,将case放到带执行列表(不包括default)
3- 调用runtime。selectgo并将selv和order作为入参传入selectgo
4- 根据selectgo返回的chosen来生成if语句,执行对应的case
加锁的顺序和解锁的顺序相反。
-
- func sellock(scases []scase, lockorder []uint16) {
- var c *hchan
- for _, o := range lockorder {
- c0 := scases[o].c
- if c0 != c {
- c = c0
- lock(&c.lock)
- }
- }
- }
-
- func selunlock(scases []scase, lockorder []uint16) {
- // 我们必须非常小心,在解锁最后一把锁后不要触摸sel,因为sel可以在最后一次解锁后立即释放。
- //考虑以下情况。第一个M调用runtime·park()在runtime·selectgo()中传递sel。
- //一旦runtime·park()解锁了最后一个锁,另一个M会使调用select的G再次可运行,
- //并安排其执行。当G在另一个M上运行时,它锁定所有锁并释放sel。现在,如果第一个M触摸sel,它将访问释放的内存。
- for i := len(lockorder) - 1; i >= 0; i-- {
- c := scases[lockorder[i]].c
- if i > 0 && c == scases[lockorder[i-1]].c {
- continue // will unlock it on the next iteration
- }
- unlock(&c.lock)
- }
- }
selectgo 处理逻辑
- // cas0指向[ncases]scase类型的数组,order0指向[2*ncases]uint16类型的数组(其中ncases必须<=65536)。
- // 返回值有两个, chosen 和 recvOK,分别表示选中的case的序号,和对接收操作是否接收成功的布尔值
- func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
- if debugSelect {
- print("select: cas0=", cas0, "\n")
- }
- //==== 执行必要的初始化操作,并生成处理case的两种顺序:轮询顺序polIorder和加锁顺序lockorder。
- // 为了将scase分配到栈上,这里直接给cas1分配了64KB大小的数组,同理, 给order1分配了128KB大小的数组
- // NOTE: In order to maintain a lean stack size, the number of scases
- // is capped at 65536.
- cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
- order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
- // ncases个数 = 发送chan个数+ 接收chan个数
- ncases := nsends + nrecvs
- // scases是cas1数组的前ncases个元素
- scases := cas1[:ncases:ncases]
- // 顺序列表pollorder是order1的0- ncases个元素
- pollorder := order1[:ncases:ncases]
- // 加锁列表lockorder是order1的ncase到 2 ncases 个元素
- lockorder := order1[ncases:][:ncases:ncases]
- // NOTE: 编译器初始化的pollorder/lockorder的基础数组不是零。
- // Even when raceenabled is true, there might be select
- // statements in packages compiled without -race (e.g.,
- // ensureSigM in runtime/signal_unix.go).
- var pcs []uintptr
- if raceenabled && pc0 != nil {
- pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
- pcs = pc1[:ncases:ncases]
- }
- casePC := func(casi int) uintptr {
- if pcs == nil {
- return 0
- }
- return pcs[casi]
- }
-
- var t0 int64
- if blockprofilerate > 0 {
- t0 = cputicks()
- }
-
- // 生成排列顺序
- norder := 0
- for i := range scases {
- cas := &scases[i]
-
- // Omit cases without channels from the poll and lock orders.
- // 处理case中channel为空的情况
- if cas.c == nil {
- cas.elem = nil // 便于GC
- continue
- }
- // 通过fastrandn函数引入随机性,确定pollorder列表中case的随机顺序索引
- j := fastrandn(uint32(norder + 1))
- pollorder[norder] = pollorder[j]
- pollorder[j] = uint16(i)
- norder++
- }
- // 重新生成列表
- pollorder = pollorder[:norder]
- lockorder = lockorder[:norder]
-
- // 根据chan地址确定lockorder加锁排序列表的顺序
- // 简单的堆排序,以保证nlogn时间复杂度完成排序
- for i := range lockorder {
- j := i
- // 从轮询顺序开始,在同一channel上排序。
- c := scases[pollorder[i]].c
- for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
- k := (j - 1) / 2
- lockorder[j] = lockorder[k]
- j = k
- }
- lockorder[j] = pollorder[i]
- }
- for i := len(lockorder) - 1; i >= 0; i-- {
- o := lockorder[i]
- c := scases[o].c
- lockorder[i] = lockorder[0]
- j := 0
- for {
- k := j*2 + 1
- if k >= i {
- break
- }
- if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
- k++
- }
- if c.sortkey() < scases[lockorder[k]].c.sortkey() {
- lockorder[j] = lockorder[k]
- j = k
- continue
- }
- break
- }
- lockorder[j] = o
- }
-
- if debugSelect {
- for i := 0; i+1 < len(lockorder); i++ {
- if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
- print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
- throw("select: broken sort")
- }
- }
- }
-
- // 锁定select中涉及的所有channel
- sellock(scases, lockorder)
-
- var (
- gp *g
- sg *sudog
- c *hchan
- k *scase
- sglist *sudog
- sgnext *sudog
- qp unsafe.Pointer
- nextp **sudog
- )
-
- // === pass 1 - 查找可以等待处理的channel
- var casi int
- var cas *scase
- var caseSuccess bool
- var caseReleaseTime int64 = -1
- var recvOK bool
- for _, casei := range pollorder {
- casi = int(casei) // case的索引
- cas = &scases[casi]
- c = cas.c
- if casi >= nsends { // 处理接收channel的case
- sg = c.sendq.dequeue()
- if sg != nil {
- // 如果当前channel的sendq上有等待的goroutine,
- // 跳到recv代码 并从缓冲区读取数据后将等待goroutine中的数据放入到缓冲区中相同的位置
- goto recv
- }
- if c.qcount > 0 {
- //如果当前channel的缓冲区不为空,就会跳到bufrecv标签处从缓冲区获取数据;
- goto bufrecv
- }
- if c.closed != 0 {
- //如果当前channel已经被关闭,就会跳到rclose读取末尾数据和收尾工作;
- goto rclose
- }
- } else { // 处理发送channel的case
- if raceenabled {
- racereadpc(c.raceaddr(), casePC(casi), chansendpc)
- }
- if c.closed != 0 {
- // 如果当前channel已经被关闭就会直接跳到sclose标签(panic中止程序)
- goto sclose
- }
- sg = c.recvq.dequeue()
- if sg != nil {
- // 如果当前channel的recvq上有等待的goroutine,就会跳到 send标签向channel发送数据;
- goto send
- }
- if c.qcount < c.dataqsiz {
- // 如果当前channel的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;
- goto bufsend
- }
- }
- }
-
- if !block { // 如果是非阻塞,即包含default分支,解锁所有channel并返回
- selunlock(scases, lockorder)
- casi = -1
- goto retc
- }
-
- // === pass 2 - 将当前goroutine根据需要挂在chan的sendq或recvq上
- gp = getg() // 获取当前的groutine
- if gp.waiting != nil {
- throw("gp.waiting != nil")
- }
- nextp = &gp.waiting // 正在等待的sudog结构;按锁定顺序
- for _, casei := range lockorder {
- casi = int(casei)
- cas = &scases[casi]
- c = cas.c
- sg := acquireSudog()
- // 获取sudog,将当前goroutine绑定到sudog上
- sg.g = gp
- sg.isSelect = true
- // 在分配elem和在gp.waiting上排队sg之间没有堆栈分割,copystack可以找到它。
- sg.elem = cas.elem
- sg.releasetime = 0
- if t0 != 0 {
- sg.releasetime = -1
- }
- sg.c = c
- // 按锁定顺序构建waiting list 。
- *nextp = sg
- nextp = &sg.waitlink
- // 加入相应等待队列
- if casi < nsends {
- c.sendq.enqueue(sg)
- } else {
- c.recvq.enqueue(sg)
- }
- }
-
- // 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nil
- gp.param = nil
- // Signal to anyone trying to shrink our stack that we're about
- // to park on a channel. The window between when this G's status
- // changes and when we set gp.activeStackChans is not safe for
- // stack shrinking.
- atomic.Store8(&gp.parkingOnChan, 1)
- // 挂起当前goroutine
- gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
- gp.activeStackChans = false
- // 加锁所有的channel
- sellock(scases, lockorder)
-
- gp.selectDone = 0
- sg = (*sudog)(gp.param)
- // param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nil
- gp.param = nil
-
- // === pass 3 - 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理
- //dequeue from unsuccessful chans
- // otherwise they stack up on quiet channels
- // record the successful case, if any.
- // We singly-linked up the SudoGs in lock order.
- // 从不成功的通道中退出队列,否则它们会堆积在安静的通道上,记录成功的案例(如果有的话)。我们单独将SudoG按锁定顺序连接起来。
-
-
- casi = -1
- cas = nil
- caseSuccess = false
- // 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudog
- sglist = gp.waiting
- // 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GC
- for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
- sg1.isSelect = false
- sg1.elem = nil
- sg1.c = nil
- }
- // 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了
- gp.waiting = nil
-
- for _, casei := range lockorder {
- k = &scases[casei]
- // 如果相等说明,goroutine是被当前case的channel收发操作唤醒的
- if sg == sglist {
- // sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队
- casi = int(casei)
- cas = k
- caseSuccess = sglist.success
- if sglist.releasetime > 0 {
- caseReleaseTime = sglist.releasetime
- }
- } else {
- // 不是此case唤醒当前goroutine, 将goroutine从case对应的队列(发送或接收)出队
- c = k.c
- if int(casei) < nsends {
- c.sendq.dequeueSudoG(sglist)
- } else {
- c.recvq.dequeueSudoG(sglist)
- }
- }
- // 释放当前case的sudog,然后处理下一个case的sudog
- sgnext = sglist.waitlink
- sglist.waitlink = nil
- releaseSudog(sglist)
- sglist = sgnext
- }
-
- if cas == nil {
- throw("selectgo: bad wakeup")
- }
-
- c = cas.c
-
- if debugSelect {
- print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
- }
-
- if casi < nsends {
- if !caseSuccess {
- goto sclose
- }
- } else {
- recvOK = caseSuccess
- }
-
- if raceenabled {
- if casi < nsends {
- raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
- } else if cas.elem != nil {
- raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
- }
- }
- if msanenabled {
- if casi < nsends {
- msanread(cas.elem, c.elemtype.size)
- } else if cas.elem != nil {
- msanwrite(cas.elem, c.elemtype.size)
- }
- }
- if asanenabled {
- if casi < nsends {
- asanread(cas.elem, c.elemtype.size)
- } else if cas.elem != nil {
- asanwrite(cas.elem, c.elemtype.size)
- }
- }
-
- selunlock(scases, lockorder)
- goto retc
-
- bufrecv:
- // 能从buffer获取数据
- if raceenabled {
- if cas.elem != nil {
- raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
- }
- racenotify(c, c.recvx, nil)
- }
- if msanenabled && cas.elem != nil {
- msanwrite(cas.elem, c.elemtype.size)
- }
- if asanenabled && cas.elem != nil {
- asanwrite(cas.elem, c.elemtype.size)
- }
- recvOK = true
- qp = chanbuf(c, c.recvx)
- if cas.elem != nil {
- typedmemmove(c.elemtype, cas.elem, qp)
- }
- typedmemclr(c.elemtype, qp)
- c.recvx++
- if c.recvx == c.dataqsiz {
- c.recvx = 0
- }
- c.qcount--
- selunlock(scases, lockorder)
- goto retc
-
- bufsend:
- // 发送数据到缓存
- if raceenabled {
- racenotify(c, c.sendx, nil)
- raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
- }
- if msanenabled {
- msanread(cas.elem, c.elemtype.size)
- }
- if asanenabled {
- asanread(cas.elem, c.elemtype.size)
- }
- typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
- c.sendx++
- if c.sendx == c.dataqsiz {
- c.sendx = 0
- }
- c.qcount++
- selunlock(scases, lockorder)
- goto retc
-
- recv:
- // 从休眠sender(sg)接收
- recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
- if debugSelect {
- print("syncrecv: cas0=", cas0, " c=", c, "\n")
- }
- recvOK = true
- goto retc
-
- rclose:
- // 读取结束的channel
- selunlock(scases, lockorder)
- recvOK = false
- if cas.elem != nil {
- typedmemclr(c.elemtype, cas.elem)
- }
- if raceenabled {
- raceacquire(c.raceaddr())
- }
- goto retc
-
- send:
- // 想休眠的接收房发送数据
- if raceenabled {
- raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
- }
- if msanenabled {
- msanread(cas.elem, c.elemtype.size)
- }
- if asanenabled {
- asanread(cas.elem, c.elemtype.size)
- }
- send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
- if debugSelect {
- print("syncsend: cas0=", cas0, " c=", c, "\n")
- }
- goto retc
-
- retc:
- if caseReleaseTime > 0 {
- blockevent(caseReleaseTime-t0, 1)
- }
- return casi, recvOK
-
- sclose:
- // 向关闭的channel发送数据
- selunlock(scases, lockorder)
- panic(plainError("send on closed channel"))
- }
简单总结下select对case处理逻辑:
1- 空的case 会被golang监听到无法唤醒的协程,会panic
2- 如果只有一个case, 根据操作类型转换成 <- ch 或 成ch <- () (会跳用channel 的 chansend , chanrecv)
3- 如果一个default 一个非default 的case,非default会走 selectnbsend 和 selectnbrecv 非阻塞的方法(最后转换成if-else 语句)
4- 多个case 的情况下, cmd/compile/internal/walk/select.go 优化程序中:
4.1 对 scase 数组, selv ,order数组初始化,将case放在带执行列表中
4.2 调用selectgo函数,根据返回的chosen 结果来生成if语句,执行对应的case
selectgo 函数:
1- 随机生成一个便利case 的 轮询 poollorder, 根据channel 地址生成一个枷锁顺序的lockorder。(随机顺序保证公平性,加锁顺序能够避免思索)
2- 根据pollorder顺序查找cases是否包含立即处理的chan, 如果有就处理。没有处理的话,创建 sudo 结构,将当前的G 加入各case的channel 对应的 接收发送队列,等待其他G唤醒
3- 当调度器 唤醒当前的G,会按照lockorder ,访问所有的case。从中找到需要处理的case进行读写处理,同时从所有的case 的发送姐搜队列中移除当前的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。