golang并发安全-select

前面说了golang的channel, 今天我们看看golang select 是怎么实现的。

数据结构

type scase struct {
	c    *hchan         // chan
	elem unsafe.Pointer // 数据
}

select 非默认的case 中都是处理channel 的 接受和发送,所有scase 结构体中c是用来存储select 的case中使用的channel

处理流程

select case 场景

编译器在中间代码生成期间会根据 select 中 case 的不同对控制语句进行优化,这一过程都发生在cmd/compile/internal/walk/select.go 中,下面会根据不同的场景进行分析代码。

没有case

代码示例

func main() {
    select {}
}

如果是空的select语句,程序会被阻塞,golang 带有死锁监测机制:如果当前写成无法被唤醒,则会panic

源码解读

在runtime/select.go中可以看到:如果cases为空直接调用gopark函数以waitReasonSelectNoCases的原因挂起当前的协程,并且无法被唤醒,golang监测到直接panic。

同样我们在walk/select.go的walkSelectCases函数中可以看到,如果case为空直接调用runtime.block函数

只有一个case

代码示例

func main() {
	ch := make(chan int)
	go func() {
		ch <- 1
	}()
	select {
	case data := <-ch:
		fmt.Println("ch data:", data)
	}
}

如果有输入直接打印ch data : 1 , 没有的话会被检测出all goroutines are asleep - deadlock!(和没有case的一样)

源码解读

如果一个非default case ,将读写转换成 ch <- 或 <- ch, 正常的channel读写

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
	// optimization: one-case select: single op.
	if ncas == 1 {
		cas := cases[0] //获取case
		ir.SetPos(cas)
		l := cas.Init()
		if cas.Comm != nil { // 不是默认
			n := cas.Comm // 获取case的条件语句
			l = append(l, ir.TakeInit(n)...)
			switch n.Op() {
			default:
				base.Fatalf("select %v", n.Op())

			case ir.OSEND: // 如果是 send, 无须处理
				// already ok

			case ir.OSELRECV2:
				r := n.(*ir.AssignListStmt)
                // 如果不是 data, ok := <- ch 类型,处理成<- ch
				if ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {
					n = r.Rhs[0]
					break
				}
                // 是的话, op设置成data, ok := <- ch形式
				r.SetOp(ir.OAS2RECV)
			}

			l = append(l, n)
		}
        // 将case 条件后要执行的语句加入带执行的列表
		l = append(l, cas.Body...)
        // 加入 break类型,跳出select-case 
		l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
		return l
	}
    // convert case value arguments to addresses.
	// this rewrite is used by both the general code and the next optimization.
	var dflt *ir.CommClause
	for _, cas := range cases {
		ir.SetPos(cas)
		n := cas.Comm
		if n == nil {
			dflt = cas
			continue
		}
		switch n.Op() {
		case ir.OSEND:
			n := n.(*ir.SendStmt)
			n.Value = typecheck.NodAddr(n.Value)
			n.Value = typecheck.Expr(n.Value)

		case ir.OSELRECV2:
			n := n.(*ir.AssignListStmt)
			if !ir.IsBlank(n.Lhs[0]) {
				n.Lhs[0] = typecheck.NodAddr(n.Lhs[0])
				n.Lhs[0] = typecheck.Expr(n.Lhs[0])
			}
		}
	}
}

两个case(一个default)

代码示例

func main() {
	ch := make(chan int)
	select {
	case data := <-ch:
		fmt.Println("ch data:", data)
	default:
		fmt.Println("default")
	}
}

如果写入就走<- 读取,反之走默认

源码解读

如果是两个case,其中一个是default,非default的会根据send还是recv 调用channel的selectnbsend和 selectnbrecv。这两个方法是非阻塞的

func walkSelectCases(cases []*ir.CommClause) []ir.Node){
	// optimization: two-case select but one is default: single non-blocking op.
	if ncas == 2 && dflt != nil {
		cas := cases[0]
		if cas == dflt { // 如果是default 放在 cases[1]
			cas = cases[1]
		}

		n := cas.Comm
		ir.SetPos(n)
		r := ir.NewIfStmt(base.Pos, nil, nil, nil)
		r.SetInit(cas.Init())
		var cond ir.Node
		switch n.Op() {
		default:
			base.Fatalf("select %v", n.Op())

		case ir.OSEND:
            // 调用selectnbsend(c, v)
			// if selectnbsend(c, v) { body } else { default body }
			n := n.(*ir.SendStmt)
			ch := n.Chan
			cond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)

		case ir.OSELRECV2:
			n := n.(*ir.AssignListStmt)
			recv := n.Rhs[0].(*ir.UnaryExpr)
			ch := recv.X
			elem := n.Lhs[0]
			if ir.IsBlank(elem) { //空的话 elem= NodNil
				elem = typecheck.NodNil()
			}
			cond = typecheck.Temp(types.Types[types.TBOOL])
            // 调用 selectnbrecv
			fn := chanfn("selectnbrecv", 2, ch.Type())
			call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)
			as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})
			r.PtrInit().Append(typecheck.Stmt(as))
		}

		r.Cond = typecheck.Expr(cond)
		r.Body = cas.Body
		r.Else = append(dflt.Init(), dflt.Body...)
		return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}
	}    
}

每次尝试从channel读/写值,如果不成功则直接返回,不会阻塞。从selectnbsend和selectnbrecv看出,最后转换成if-else

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    // block:false
    // chan将 select准换if-else
	return chansend(c, elem, false, getcallerpc())
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selected, ok = selectnbrecv(&v, c); selected {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	// block:false
    // chan将 select准换if-else
    return chanrecv(c, elem, false)
}

多个case

代码示例

func main() {
	ch := make(chan int)
	go func() {
		tempArr := []int{1,2,3,4,5,6}
		for i := range tempArr {
			ch <- i
		}
	}()
	go func() {
		for {
			select {
			case i := <-ch:
				println("first: ", i)
			case i := <-ch:
				println("second", i)
			}
		}
	}()
	time.Sleep(3 * time.Second)

}

可以看到多个case,会随机选取一个case执行

源码解读

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
	ncas := len(cases)
	sellineno := base.Pos
	if dflt != nil {
		ncas--
	}
    // 定义casorder为ncas大小的case语句的数组
	casorder := make([]*ir.CommClause, ncas)
    // 分别定义nsends为发送channel的case个数,nrecvs为接收channel的case个数
	nsends, nrecvs := 0, 0
    // 多case编译后待执行的语句列表
	var init []ir.Node

	// generate sel-struct
	base.Pos = sellineno
    // 定义selv为长度为ncas的scase类型的数组
    // scasetype()函数返回的就是scase结构体,包含c和elem两个字段
	selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))
	init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))

	// No initialization for order; runtime.selectgo is responsible for that.
	// 定义order为2倍的ncas长度的TUINT16类型的数组
    // 注意:selv和order作为runtime.selectgo()函数的入参,前者存放scase列表内存地址,后者用来做scase排序使用,排序是为了便于挑选出待执行的case
    order := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))

	var pc0, pcs ir.Node
	if base.Flag.Race {
		pcs = typecheck.Temp(types.NewArray(types.Types[types.TUINTPTR], int64(ncas)))
		pc0 = typecheck.Expr(typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(0))))
	} else {
		pc0 = typecheck.NodNil()
	}

	// register cases 遍历case生成scase对象放到selv中
	for _, cas := range cases {
		ir.SetPos(cas)

		init = append(init, ir.TakeInit(cas)...)

		n := cas.Comm
		if n == nil { // default:
			continue
		}

		var i int
		var c, elem ir.Node
		switch n.Op() { // 根据类型获取chan, elem的值
		default:
			base.Fatalf("select %v", n.Op())
		case ir.OSEND: // 发送chan类型,i从0开始递增
			n := n.(*ir.SendStmt)
			i = nsends
			nsends++
			c = n.Chan
			elem = n.Value
		case ir.OSELRECV2: // 接收chan,i从ncas开始递减
			n := n.(*ir.AssignListStmt)
			nrecvs++
			i = ncas - nrecvs
			recv := n.Rhs[0].(*ir.UnaryExpr)
			c = recv.X
			elem = n.Lhs[0]
		}

		casorder[i] = cas
       // 定义一个函数,写入c或elem到selv数组
		setField := func(f string, val ir.Node) {
            // 放到selv数组
			r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)
            // 添加到带执行列表
			init = append(init, typecheck.Stmt(r))
		}

		c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])
		setField("c", c)
		if !ir.IsBlank(elem) {
			elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])
			setField("elem", elem)
		}

		// TODO(mdempsky): There should be a cleaner way to
		// handle this.
		if base.Flag.Race {
			r := mkcallstmt("selectsetpc", typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(int64(i)))))
			init = append(init, r)
		}
	}
    // 如果发送chan和接收chan的个数不等于ncas,直接报错
	if nsends+nrecvs != ncas {
		base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)
	}

	// run the select  开始执行select动作
	base.Pos = sellineno
    // 定义chosen, recvOK作为selectgo()函数的两个返回值
    // chosen 表示被选中的case的索引,recvOK表示对于接收操作,是否成功接收
	chosen := typecheck.Temp(types.Types[types.TINT])
	recvOK := typecheck.Temp(types.Types[types.TBOOL])
	r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)
	r.Lhs = []ir.Node{chosen, recvOK}
    // 调用runtime.selectgo()函数作为运行时实际执行多case的select动作的函数
	fn := typecheck.LookupRuntime("selectgo")
	var fnInit ir.Nodes
	r.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}
	init = append(init, fnInit...)
	init = append(init, typecheck.Stmt(r))

	// selv and order are no longer alive after selectgo.
    // 执行完selectgo()函数后,销毁selv和order数组.
	init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))
	init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))
	if base.Flag.Race {
		init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, pcs))
	}

	// dispatch cases 
    //定义一个函数,根据chosen确定的case分支生成if语句,执行该分支的语句
	dispatch := func(cond ir.Node, cas *ir.CommClause) {
		cond = typecheck.Expr(cond)
		cond = typecheck.DefaultLit(cond, nil)

		r := ir.NewIfStmt(base.Pos, cond, nil, nil)

		if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {
			n := n.(*ir.AssignListStmt)
			if !ir.IsBlank(n.Lhs[1]) {
				x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)
				r.Body.Append(typecheck.Stmt(x))
			}
		}

		r.Body.Append(cas.Body.Take()...)
		r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
		init = append(init, r)
	}
    // 如果多case中有default分支,并且chosen小于0,执行该default分支
	if dflt != nil {
		ir.SetPos(dflt)
		dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)
	}
    // 如果有chosen选中的case分支,即chosen等于i,则执行该分支
	for i, cas := range casorder {
		ir.SetPos(cas)
		dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)
	}

	return init
}

从上面代码可以看出:

1- 初始化过程: 生成scase数组,定义selv 存放scase数组内存地址,定义order 来给scase排序

2- 遍历所有的case ,将case放到带执行列表(不包括default)

3- 调用runtime。selectgo并将selv和order作为入参传入selectgo

4- 根据selectgo返回的chosen来生成if语句,执行对应的case

解锁加锁

加锁的顺序和解锁的顺序相反。


func sellock(scases []scase, lockorder []uint16) {
	var c *hchan
	for _, o := range lockorder {
		c0 := scases[o].c
		if c0 != c {
			c = c0
			lock(&c.lock)
		}
	}
}

func selunlock(scases []scase, lockorder []uint16) {
	// 我们必须非常小心,在解锁最后一把锁后不要触摸sel,因为sel可以在最后一次解锁后立即释放。
    //考虑以下情况。第一个M调用runtime·park()在runtime·selectgo()中传递sel。
    //一旦runtime·park()解锁了最后一个锁,另一个M会使调用select的G再次可运行,
    //并安排其执行。当G在另一个M上运行时,它锁定所有锁并释放sel。现在,如果第一个M触摸sel,它将访问释放的内存。
	for i := len(lockorder) - 1; i >= 0; i-- {
		c := scases[lockorder[i]].c
		if i > 0 && c == scases[lockorder[i-1]].c {
			continue // will unlock it on the next iteration
		}
		unlock(&c.lock)
	}
}

selectgo

selectgo 处理逻辑

// cas0指向[ncases]scase类型的数组,order0指向[2*ncases]uint16类型的数组(其中ncases必须<=65536)。
// 返回值有两个, chosen 和 recvOK,分别表示选中的case的序号,和对接收操作是否接收成功的布尔值
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
	if debugSelect {
		print("select: cas0=", cas0, "\n")
	}
    //==== 执行必要的初始化操作,并生成处理case的两种顺序:轮询顺序polIorder和加锁顺序lockorder。
  // 为了将scase分配到栈上,这里直接给cas1分配了64KB大小的数组,同理, 给order1分配了128KB大小的数组
	// NOTE: In order to maintain a lean stack size, the number of scases
	// is capped at 65536.
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
    // ncases个数 = 发送chan个数+ 接收chan个数
	ncases := nsends + nrecvs
    // scases是cas1数组的前ncases个元素
	scases := cas1[:ncases:ncases]
    // 顺序列表pollorder是order1的0- ncases个元素
	pollorder := order1[:ncases:ncases]
    // 加锁列表lockorder是order1的ncase到 2 ncases 个元素
	lockorder := order1[ncases:][:ncases:ncases]
	// NOTE: 编译器初始化的pollorder/lockorder的基础数组不是零。
	// Even when raceenabled is true, there might be select
	// statements in packages compiled without -race (e.g.,
	// ensureSigM in runtime/signal_unix.go).
	var pcs []uintptr
	if raceenabled && pc0 != nil {
		pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
		pcs = pc1[:ncases:ncases]
	}
	casePC := func(casi int) uintptr {
		if pcs == nil {
			return 0
		}
		return pcs[casi]
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

    // 生成排列顺序
    norder := 0
	for i := range scases {
		cas := &scases[i]

		// Omit cases without channels from the poll and lock orders.
        // 处理case中channel为空的情况
		if cas.c == nil {
			cas.elem = nil // 便于GC
			continue
		}
       // 通过fastrandn函数引入随机性,确定pollorder列表中case的随机顺序索引
		j := fastrandn(uint32(norder + 1))
		pollorder[norder] = pollorder[j]
		pollorder[j] = uint16(i)
		norder++
	}
	// 重新生成列表
	pollorder = pollorder[:norder]
	lockorder = lockorder[:norder]

    // 根据chan地址确定lockorder加锁排序列表的顺序
    // 简单的堆排序,以保证nlogn时间复杂度完成排序
	for i := range lockorder {
		j := i
        // 从轮询顺序开始,在同一channel上排序。
		c := scases[pollorder[i]].c
		for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
			k := (j - 1) / 2
			lockorder[j] = lockorder[k]
			j = k
		}
		lockorder[j] = pollorder[i]
	}
	for i := len(lockorder) - 1; i >= 0; i-- {
		o := lockorder[i]
		c := scases[o].c
		lockorder[i] = lockorder[0]
		j := 0
		for {
			k := j*2 + 1
			if k >= i {
				break
			}
			if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
				k++
			}
			if c.sortkey() < scases[lockorder[k]].c.sortkey() {
				lockorder[j] = lockorder[k]
				j = k
				continue
			}
			break
		}
		lockorder[j] = o
	}

	if debugSelect {
		for i := 0; i+1 < len(lockorder); i++ {
			if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
				print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
				throw("select: broken sort")
			}
		}
	}

    // 锁定select中涉及的所有channel
	sellock(scases, lockorder)

	var (
		gp     *g
		sg     *sudog
		c      *hchan
		k      *scase
		sglist *sudog
		sgnext *sudog
		qp     unsafe.Pointer
		nextp  **sudog
	)

	// === pass 1 - 查找可以等待处理的channel
	var casi int
	var cas *scase
	var caseSuccess bool
	var caseReleaseTime int64 = -1
	var recvOK bool
	for _, casei := range pollorder {
		casi = int(casei) // case的索引
		cas = &scases[casi]  
		c = cas.c
		if casi >= nsends { // 处理接收channel的case
			sg = c.sendq.dequeue()
			if sg != nil {
                // 如果当前channel的sendq上有等待的goroutine,
                // 跳到recv代码 并从缓冲区读取数据后将等待goroutine中的数据放入到缓冲区中相同的位置
				goto recv
			}
			if c.qcount > 0 {
                //如果当前channel的缓冲区不为空,就会跳到bufrecv标签处从缓冲区获取数据;
				goto bufrecv
			}
			if c.closed != 0 {
                //如果当前channel已经被关闭,就会跳到rclose读取末尾数据和收尾工作;
				goto rclose
			}
		} else { // 处理发送channel的case
			if raceenabled {
				racereadpc(c.raceaddr(), casePC(casi), chansendpc)
			}
			if c.closed != 0 {
                // 如果当前channel已经被关闭就会直接跳到sclose标签(panic中止程序)
				goto sclose
			}
			sg = c.recvq.dequeue()
			if sg != nil {
                // 如果当前channel的recvq上有等待的goroutine,就会跳到 send标签向channel发送数据;
				goto send
			}
			if c.qcount < c.dataqsiz {
                // 如果当前channel的缓冲区存在空闲位置,就会将待发送的数据存入缓冲区;
				goto bufsend
			}
		}
	}

	if !block { // 如果是非阻塞,即包含default分支,解锁所有channel并返回
		selunlock(scases, lockorder)
		casi = -1
		goto retc
	}

	// === pass 2 - 将当前goroutine根据需要挂在chan的sendq或recvq上
	gp = getg() // 获取当前的groutine
	if gp.waiting != nil {
		throw("gp.waiting != nil")
	}
	nextp = &gp.waiting // 正在等待的sudog结构;按锁定顺序
	for _, casei := range lockorder {
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c
		sg := acquireSudog()
        // 获取sudog,将当前goroutine绑定到sudog上
		sg.g = gp
		sg.isSelect = true
        // 在分配elem和在gp.waiting上排队sg之间没有堆栈分割,copystack可以找到它。
		sg.elem = cas.elem
		sg.releasetime = 0
		if t0 != 0 {
			sg.releasetime = -1
		}
		sg.c = c
        // 按锁定顺序构建waiting list 。
		*nextp = sg
		nextp = &sg.waitlink
        // 加入相应等待队列
		if casi < nsends {
			c.sendq.enqueue(sg)
		} else {
			c.recvq.enqueue(sg)
		}
	}

	// 被唤醒后会根据 param 来判断是否是由 close 操作唤醒的,所以先置为 nil
	gp.param = nil
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
    // 挂起当前goroutine
	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	gp.activeStackChans = false
    // 加锁所有的channel
	sellock(scases, lockorder)

	gp.selectDone = 0
	sg = (*sudog)(gp.param)
     // param 存放唤醒 goroutine 的 sudog,如果是关闭操作唤醒的,那么就为 nil
	gp.param = nil

	// === pass 3 - 当前 Goroutine 被唤醒之后找到满足条件的 Channel 并进行处理
    //dequeue from unsuccessful chans
	// otherwise they stack up on quiet channels
	// record the successful case, if any.
	// We singly-linked up the SudoGs in lock order.
    // 从不成功的通道中退出队列,否则它们会堆积在安静的通道上,记录成功的案例(如果有的话)。我们单独将SudoG按锁定顺序连接起来。


	casi = -1
	cas = nil
	caseSuccess = false
    // 当前goroutine 的 waiting 链表按照lockorder顺序存放着case的sudog
	sglist = gp.waiting
    // 在从 gp.waiting 取消case的sudog链接之前清除所有元素,便于GC
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
		sg1.isSelect = false
		sg1.elem = nil
		sg1.c = nil
	}
    // 清楚当前goroutine的waiting链表,因为被sg代表的协程唤醒了
	gp.waiting = nil

	for _, casei := range lockorder {
		k = &scases[casei]
        // 如果相等说明,goroutine是被当前case的channel收发操作唤醒的
		if sg == sglist {
            // sg唤醒了当前goroutine, 则当前G已经从sg的队列中出队,这里不需要再次出队
			casi = int(casei)
			cas = k
			caseSuccess = sglist.success
			if sglist.releasetime > 0 {
				caseReleaseTime = sglist.releasetime
			}
		} else {
            // 不是此case唤醒当前goroutine, 将goroutine从case对应的队列(发送或接收)出队
			c = k.c
			if int(casei) < nsends {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
         // 释放当前case的sudog,然后处理下一个case的sudog
		sgnext = sglist.waitlink
		sglist.waitlink = nil
		releaseSudog(sglist)
		sglist = sgnext
	}

	if cas == nil {
		throw("selectgo: bad wakeup")
	}

	c = cas.c

	if debugSelect {
		print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
	}

	if casi < nsends {
		if !caseSuccess {
			goto sclose
		}
	} else {
		recvOK = caseSuccess
	}

	if raceenabled {
		if casi < nsends {
			raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
		} else if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
	}
	if msanenabled {
		if casi < nsends {
			msanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			msanwrite(cas.elem, c.elemtype.size)
		}
	}
	if asanenabled {
		if casi < nsends {
			asanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			asanwrite(cas.elem, c.elemtype.size)
		}
	}

	selunlock(scases, lockorder)
	goto retc

bufrecv:
    // 能从buffer获取数据
	if raceenabled {
		if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
		racenotify(c, c.recvx, nil)
	}
	if msanenabled && cas.elem != nil {
		msanwrite(cas.elem, c.elemtype.size)
	}
	if asanenabled && cas.elem != nil {
		asanwrite(cas.elem, c.elemtype.size)
	}
	recvOK = true
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	// 发送数据到缓存
	if raceenabled {
		racenotify(c, c.sendx, nil)
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	if asanenabled {
		asanread(cas.elem, c.elemtype.size)
	}
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc

recv:
    // 从休眠sender(sg)接收
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncrecv: cas0=", cas0, " c=", c, "\n")
	}
	recvOK = true
	goto retc

rclose:
    // 读取结束的channel
	selunlock(scases, lockorder)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	if raceenabled {
		raceacquire(c.raceaddr())
	}
	goto retc

send:
	// 想休眠的接收房发送数据
	if raceenabled {
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	if asanenabled {
		asanread(cas.elem, c.elemtype.size)
	}
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncsend: cas0=", cas0, " c=", c, "\n")
	}
	goto retc

retc:
	if caseReleaseTime > 0 {
		blockevent(caseReleaseTime-t0, 1)
	}
	return casi, recvOK

sclose:
	// 向关闭的channel发送数据
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))
}

总结

简单总结下select对case处理逻辑:

1- 空的case 会被golang监听到无法唤醒的协程,会panic

2- 如果只有一个case, 根据操作类型转换成 <- ch 或 成ch <- () (会跳用channel 的 chansend , chanrecv)

3- 如果一个default 一个非default 的case,非default会走 selectnbsend 和 selectnbrecv 非阻塞的方法(最后转换成if-else 语句)

4- 多个case 的情况下, cmd/compile/internal/walk/select.go 优化程序中:

4.1 对 scase 数组, selv ,order数组初始化,将case放在带执行列表中

4.2 调用selectgo函数,根据返回的chosen 结果来生成if语句,执行对应的case

selectgo 函数:

1- 随机生成一个便利case 的 轮询 poollorder, 根据channel 地址生成一个枷锁顺序的lockorder。(随机顺序保证公平性,加锁顺序能够避免思索)

2- 根据pollorder顺序查找cases是否包含立即处理的chan, 如果有就处理。没有处理的话,创建 sudo 结构,将当前的G 加入各case的channel 对应的 接收发送队列,等待其他G唤醒

3- 当调度器 唤醒当前的G,会按照lockorder ,访问所有的case。从中找到需要处理的case进行读写处理,同时从所有的case 的发送姐搜队列中移除当前的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/307495.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

在AWS云上面创建Developers用户组

问题 需要给开发人员创建一个专门的Developers用户组&#xff0c;保证开发人员只能够尽兴相关操作。注意&#xff0c;我这里使用的AWS国际版。 创建Developers用户组 打开用户组页面&#xff0c;点击用户组&#xff0c;创建组&#xff0c;进行用户组创建&#xff0c;如下图&…

哪种台灯适合备考用?2024值得入手的护眼台灯推荐

现在孩子的近视率越来越高了&#xff0c;也有很多家长开始意识到孩子用眼健康的重要性。而台灯作为守护孩子用眼环境的必备用品&#xff0c;很多家长想给孩子购置一款护眼灯&#xff0c;却看见市面琳琅满目的款式根本不知道怎么购买&#xff0c;害怕买到劣质的护眼台灯&#xf…

SpringBoot+SSM项目实战 苍穹外卖(09) day9作业

继续上一节的内容&#xff0c;本节是作业课&#xff0c;要求独立完成&#xff1a;用户端历史订单模块、商家端订单管理模块相关业务新功能开发和已有功能优化。 目录 作业要求用户端历史订单模块查询历史订单查询订单详情取消订单再来一单 商家端订单管理模块订单搜索各个状态的…

每天刷两道题——第十一天

1.1滑动窗口最大值 给你一个整数数组 nums&#xff0c;有一个大小为 k 的滑动窗口从数组的最左侧移动到数组的最右侧。你只可以看到在滑动窗口内的 k 个数字。滑动窗口每次只向右移动一位。返回滑动窗口中的最大值 。 输入&#xff1a;nums [1,3,-1,-3,5,3,6,7], k 3 输出&…

爆肝整理,性能测试-场景设计/性能调优总结,一篇概全...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、性能测试场景设…

密钥管理机制如何进行工作

密钥管理机制是信息安全领域中一个至关重要的环节&#xff0c;其目标是确保密钥的安全传输、存储和使用&#xff0c;从而保障整个系统的安全性和可靠性。在实际工作中&#xff0c;密钥管理机制涉及到多个方面的技术和方法&#xff0c;下面将详细介绍其工作原理和过程。 密钥管理…

Navicat迁移局域网内其他PC机的MySQL数据库

迁移局域网内其他PC机的MySQL数据库到本机 查看局域网IP 设置可远程连接的账号 开放本机防火墙的3306端口 连接PC机的MySQL 利用Navicat迁移数据库 刚换了个电脑&#xff0c;旧电脑的MySQL数据库太多了&#xff0c;转成.sql文件&#xff0c;再传输到新电脑上运行&#xff…

python24.1.10创造购物清单

数据结构-列表 在列表里额外加东西 删除列表中某个元素 列表可包含多种类型的数据 统计列表中元素数量 列表索引 针对列表的函数 利用索引赋值可以直接覆盖本来元素 实践

重置 Docker 中 Gitlab 的账号密码

1、首先进入Docker容器 docker exec -it gitlab bash 2、连接到 gitlab 的数据库 需要谨慎操作 gitlab-rails console -e production 等待加载完后会进入控制台 ------------------------------------------------------------------------------------------------------…

2022 年全国职业院校技能大赛高职组云计算赛项试卷部分解析

2022 年全国职业院校技能大赛高职组云计算赛项试卷部分解析 【赛程名称】高职组-云计算赛项第一场-私有云【任务 1】私有云服务搭建[10 分]【题目 2】Yum 源配置[0.5 分]【题目 3】配置无秘钥 ssh[0.5 分]【题目 4】基础安装[0.5 分]【题目 5】数据库安装与调优[0.5 分]【题目 …

springboot程序启动慢解决

记springboot程序启动慢解决。 今天将程序发给别人后&#xff0c;别人立马说你这个启动很慢。 查看程序启动耗时分布 <!--启动耗时监测--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator…

WebGL开发实验设备与操作演示

在开发实验设备模型与操作演示的WebGL应用时&#xff0c;你需要考虑以下步骤和关键要点&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1.模型创建与导入&#xff1a; 利用3D建模工具&#xff08;如…

基于ssm的政府项目管理平台+vue论文

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本政府项目管理平台就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息…

C# WPF 数据绑定

需求 后台变量发生改变&#xff0c;前端对应的相关属性值也发生改变 实现 接口 INotifyPropertyChanged 用于通知客户端&#xff08;通常绑定客户端&#xff09;属性值已更改。 示例 示例一 官方示例代码如下 using System; using System.Collections.Generic; using Sy…

小型图书借阅管理系统

springbootmybatismysqlthymeleafjquery构建的小型图书借阅管理系统后端 1.springboot 2.mybatis数据库 1.mysql前端 1.jquery 2.jquery-validate 3.htmlcss

欢乐的周末 - 华为OD统一考试

OD统一考试 题解&#xff1a; Java / Python / C 题目描述 小华和小为是很要好的朋友&#xff0c;他们约定周末一起吃饭。 通过手机交流&#xff0c;他们在地图上选择了多个聚餐地点(由于自然地形等原因&#xff0c;部分聚餐地点不可达)。求小华和小为都能到达的聚餐地点有多…

面试宝典之spring框架常见面试题

F1、类的反射机制有啥用&#xff1f; &#xff08;1&#xff09;增加程序的灵活性&#xff0c;可扩展性&#xff0c;动态创建对象。 &#xff08;2&#xff09;框架必备&#xff0c;任何框架的封装都要用反射。&#xff08;框架的灵魂&#xff09; F2、获取Class对象的三种方…

Windows解决.conda文件夹占用C盘空间过大的问题

背景&#xff1a;C盘空间被.conda文件占用16G&#xff0c;主要原因是里面存放了python环境&#xff0c;提前进行环境迁移&#xff0c;防止后面环境增长C盘空间不足 解决办法&#xff1a; 1. .conda文件备份 2. 将.conda文件夹中的envs内容复制到Anaconda的安装目录下D:\Softwa…

借助AI识别网关实现高空坠物监测预警

高空坠物危害着民众的人身财产安全&#xff0c;有些高空坠物是来源于违法的高空抛物行为&#xff0c;也有一些是来源于高层建筑施工不规范、建筑设施老化、意外事故等因素。 无论哪种类型的高空坠物&#xff0c;都可以借助AI智能网关搭建坠物识别监测预警系统&#xff0c;实现对…
最新文章