导读:golang中的锁是通过CAS原子操作实现的,Mutex结构如下: type Mutex struct { ????state int32? ? ? ? ? ? ? ?? ????sema??uint32 } ? //state表示锁当前状态,每个位都有意义,零值表示未上锁 //sema用做信号量,通过PV操作从等待队列中阻塞/唤醒goroutine
type Mutex struct {
????state int32? ? ? ? ? ? ? ??
//sema用做信号量,通过PV操作从等待队列中阻塞/唤醒goroutine,等待锁的goroutine会挂到等待队列中,并且陷入睡眠不被调度,unlock锁时才唤醒。具体在sync/mutex.go Lock函数实现中。
type semaRoot struct {
????treap *sudog // root of balanced tree of unique waiters.
????nwait uint32 // Number of waiters. Read w/o the lock.
var semtable [semTabSize]struct {
????root semaRoot
????pad??[cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
1 让当前goroutine睡眠阻塞是通过goparkunlock实现的,在semacquire1中这样调用:
? ? ? ? ? 1)?root := semroot(addr)
? ? ? ? ? ? ? ? semroot中是通过信号量地址找到semaRoot结构
? ? ? ? ? 2) 略过一段..... 直接到使当前goroutine睡眠位置
? ? ? ? ? ? ? ? 首先lock(&root.lock)上锁
? ? ? ? ? ? ? ? 然后调用root.queue()让当前goroutine进入等待队列(注意一个信号量管理多个goroutine,goroutine睡眠前,本身的详细信息就要保存起来,放到队列中,也就是在挂到了semaRoot结构的treap上,看注释队列是用平衡树实现的?)
? ? ? ? ? 3)调用goparkunlock(&root.lock,waitReasonSemacquire,traceEvGoBlockSync,4)?
? ? ? ? ? ? ? ? 最后会调用到gopark,gopark会让系统重新执行一次调度,在重新调度之前,会将当前goroutine,即G对象状态置为sleep状态,不再被调度直到被唤醒,然后unlock锁,这个函数给了系统一个机会,将代码执行权限转交给runtime调度器,runtime会去调度别的goroutine。
2 既然阻塞,就需要有唤醒的机制
? ?唤醒机制是通过semtable结构
? ?sema.go并非专门为mutex锁中的设计的,在mutex中使用的话,是在其它goroutine释放Mutex时,调用的semrelease1,从队列中唤醒goroutine执行。详细没看。
? ?不过根据分析,Mutex是互斥锁,Mutex中的信号量应该是二值信号量,只有0和1。在Mutex中调用Lock,假如执行到semacquire1,从中判断信号量如果为0,就让当前goroutine睡眠,
func cansemacquire(addr *uint32) bool {
????for {
????????v := atomic.Load(addr)
????????if v == 0 {
????????????return false
????????if atomic.Cas(addr,v,v-1) {
????????????return true
? ? ? 如果不断有goroutine尝试获取Mutex锁,都会判断到信号量为0,会不断有goroutine陷入睡眠状态。只有当unlock时,信号量才会+1,当然不能重复执行unlock,所以这个信号量应该只为0和1。
state? ? ?|31|30|....|? ? ? 2? ? |? ? ?1? ? ? |? ? ? 0? ? ?|
? ? ? ? ? ? ? ? ? |?? ??? ??? ?? ? |?? ?? ? ? ?|? ? ? 第0位表示当前被加锁,0,unlock,? ?1 locked
?? ?? ? ? ? ? ? ? |? ? ? ? ? ? ? ? |? ? ? ? 是否有goroutine已被唤醒,0 唤醒, 1 没有
?? ??? ??? ?? ? ? |? ? ? ? ? ?这一位表示当前Mutex处于什么模式,两种模式,0 Normal? ?1 Starving
? ? ? ? ? ? ?第三位表示尝试Lock这个锁而等待的goroutine的个数

两种模式是为了锁的公平性而实现,摘取网上的一段翻译: http://www.voidcn.com/article/p-unrkzoaq-bsd.html
1. 是等待队列中的最后一个goroutine
2. 它的等待时间不超过1ms。
????// Fast path: grab unlocked mutex.
? ? // 1? 使用原子操作修改锁状态为locked
????if atomic.CompareAndSwapInt32(&m.state,mutexLocked) {
????????if race.Enabled {
1) 假设当前mutex处于初始状态,即m.state=0,那么当前goroutine会在这里会直接获取到锁,m.state变为locked,
则m.state = 00...001? ? ?上锁了,Not Woken,normal状态。?
? ? ? 运气好,一来就获取到,就跟上面说的一样,来时就在cpu里,又赶上锁没人占,天生自带光环,呵呵。
? ? ? Lock结束return
? ? ? 如果这个goroutine不释放锁,那么然后再来一个goroutine就锁不上了,进入第二步
2) 紧接着一个for循环,大概就是尝试获取锁,求而不得,就睡一会吧,等着被叫醒,醒了看看是不是等的时间太长饿了,饿了就进入starving,starving就会被优先调度了,没有那运气,就只能等了。
????var waitStartTime int64
????starving := false
????awoke := false
????iter := 0
????old := m.state? ? //刚才已经设置m.state=001,old也为001
????for {
????????// Don‘t spin in starvation mode,ownership is handed off to waiters
????????// so we won‘t be able to acquire the mutex anyway.
? ? ? ? // old=001,锁着呢
? ? ? ? // 然后runtime_canSpin看看能不能自旋啊,就是看传进来的iter,每次循环都是自增
? ? ? ? // 自旋条件:多核,GOMAXPROCS>1,至少有另外一个运行的P并且本地队列不空。或许是害怕单核自旋,程序都停了。另外最多自旋4次,iter为4时不会再进if
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?我们这里考虑多核的情况,会进if
? ? ? ? // old在每次if中会重新获取,这里自旋的目的就是等待锁释放,当前占用cpu的goroutine就可以占了,go里面总是尽量让在cpu中的goroutine占用锁
????????if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
????????????// Active spinning makes sense.
????????????// Try to set mutexWoken flag to inform Unlock
????????????// to not wake other blocked goroutines.
? ? ? ? ? ? // 当前awoke为false,但是没有goroutine在等待,那么unlock时,没必要唤醒队列goroutine。
????????????if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state,old,old|mutexWoken) {
????????????????awoke = true
????????????runtime_doSpin()? ? ?//自旋,执行没用的指令30次
????????????old = m.state? ? ? ? ? ?//old重新获取一次state值,如果有其它goroutine释放了,那么下次循环就不进if了
????????????continue? ? ? ? ? ? ? ? ? ?//自旋完再循环一次
?? ?? ? //if出来后,会有两种情况
?? ?? ? 2.1)其它goroutine? unlock了,上面if判断非Locked跳出,此时 m.state=000,old=000,awoke=false,没有goroutine在等待,这是最简单的情况了
????????new := old? ? ? ? ? ? ? ? ? //new=000,? ?old=000,? m.state=000,? awoke=false,这里初始化new,后面要设置锁状态,m.state设置为new
????????// Don‘t try to acquire starving mutex,new arriving goroutines must queue.
????????if old&mutexStarving == 0 {? ? ? //new=000,当前锁并不是starving模式,正在运行的goroutine要占用这个锁,如果是starving模式,当前的goroutine要去排队,把锁让给队列中快饿死的兄弟? ? ?
????????????new |= mutexLocked? ? ? ? ? ? ? //new=001, 要上锁
????????if old&(mutexLocked|mutexStarving) != 0 {? ? ? ?//old=000,当前正在跑的这个goroutine要占锁,不会进队列, new=001
????????????new += 1 << mutexWaiterShift
????????// The current goroutine switches mutex to starvation mode.
????????// But if the mutex is currently unlocked,don‘t do the switch.
????????// Unlock expects that starving mutex has waiters,which will not
????????// be true in this case.
????????if starving && old&mutexLocked != 0 {? ? ? ? ? ? ?//starving=false,只有goroutine在unlock唤醒后,发现等待时间过长,starving才设置为true,因为队列中其它的goroutine都等的有点长了,所以在锁可用时,优先给队列中的goroutine。这个逻辑在后面,当前不进这个if,new=001
????????????new |= mutexStarving
????????if awoke {? ? ? ? ? ? ? ? ? ? ? ?//awoke为false,不去唤醒等待队列, new仍为001
????????????// The goroutine has been woken from sleep,
????????????// so we need to reset the flag in either case.
????????????if new&mutexWoken == 0 {
????????????????throw("sync: inconsistent mutex state")
????????????new &^= mutexWoken
? ? ? ? ? ?至此new初始化完毕,new=001,要去更改Mutex的锁状态,真正独占锁了
? ? ? ? ? //保险起见,以防在new设置过程中,有其它goroutine更改了锁状态,原子性的设置当前锁状态为new=001,这里就是上锁
? ? ? ? ? if atomic.CompareAndSwapInt32(&m.state,new) {? ? ? ??
????????????if old&(mutexLocked|mutexStarving) == 0 {? ? ? ? ? ? ? ? ? ? ? ? ? ?//old=000,直接break,因为上面是将m.state置为上锁,已经成功了,至此后面逻辑不走了
????????????????break // locked the mutex with CAS? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //回头看2.1,我们如果是自旋次数够了跳出呢?如2.2逻辑
????????????// If we were already waiting before,queue at the front of the queue.
????????????queueLifo := waitStartTime != 0
????????????if waitStartTime == 0 {
????????????????waitStartTime = runtime_nanotime()
????????????starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
????????????old = m.state
????????????if old&mutexStarving != 0 {
????????????????// If this goroutine was woken and mutex is in starvation mode,
????????????????// ownership was handed off to us but mutex is in somewhat
????????????????// inconsistent state: mutexLocked is not set and we are still
????????????????// accounted as waiter. Fix that.
????????????????if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
????????????????????throw("sync: inconsistent mutex state")
????????????????delta := int32(mutexLocked - 1<<mutexWaiterShift)
????????????????if !starving || old>>mutexWaiterShift == 1 {
????????????????????// Exit starvation mode.
????????????????????// Critical to do it here and consider wait time.
????????????????????// Starvation mode is so inefficient,that two goroutines
????????????????????// can go lock-step infinitely once they switch mutex
????????????????????// to starvation mode.
????????????????????delta -= mutexStarving
????????????awoke = true
????????????iter = 0
????????} else {
????????????old = m.state
? ? ? ?2.2)new := old,? ? 此时new=001,old=001,m.state=001,awoke=false (awoke在if中设置为true的情况就不讨论了,太多了。。。。)
????????// Don‘t try to acquire starving mutex,new arriving goroutines must queue.
????????if old&mutexStarving == 0 {
????????????new |= mutexLocked? ? ? ? ? ? ? ? ? ? //new=001
????????if old&(mutexLocked|mutexStarving) != 0 {? ? //old=001,当前跑的这个goroutine要进队列,new的第3位到第31位表示队列中goroutine数量,这里+1
????????????new += 1 << mutexWaiterShift? ? ? ? ? ? ? ? ? //new=1001
????????// The current goroutine switches mutex to starvation mode.
????????// But if the mutex is currently unlocked,which will not
????????// be true in this case.
????????if starving && old&mutexLocked != 0 {? ? ? ? //starving=false,并不需要进入starving模式
????????????new |= mutexStarving
????????if awoke {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //awoke=false
????????????// The goroutine has been woken from sleep,
????????????// so we need to reset the flag in either case.
????????????if new&mutexWoken == 0 {
????????????????throw("sync: inconsistent mutex state")
????????????new &^= mutexWoken
? ? ? ? ? ? ? new初始化为1001, old=001
????????if atomic.CompareAndSwapInt32(&m.state,new) {
????????????if old&(mutexLocked|mutexStarving) == 0 {? ? ? ? ? ? ? ? ?//old=001,这里不会break,因为当前的goroutine拿不到锁需要阻塞睡眠
????????????????break // locked the mutex with CAS
????????????// If we were already waiting before,queue at the front of the queue.
????????????queueLifo := waitStartTime != 0? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //判断当前goroutine是不是for循环第一次走到这里,是的话,waitStartTime=0
????????????if waitStartTime == 0 {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //queueLifo的true还是false决定了goroutine入队列时,是排队还是插到队头
????????????????waitStartTime = runtime_nanotime()
????????????runtime_SemacquireMutex(&m.sema,queueLifo)? ? ? ? ? //当前goroutine入等待队列,跳到 “注脚1”,更多说明。此时goroutine会阻塞在这,锁释放,如果在队头,才会被唤醒。
????????????starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs? ? //唤醒时判断是否等待时间过长,超过了1ms,就设置starving为true,“注脚2”更多说明
????????????old = m.state
????????????if old&mutexStarving != 0 {
????????????????// If this goroutine was woken and mutex is in starvation mode,
????????????????// ownership was handed off to us but mutex is in somewhat
????????????????// inconsistent state: mutexLocked is not set and we are still
????????????????// accounted as waiter. Fix that.
????????????????if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
????????????????????throw("sync: inconsistent mutex state")
????????????????delta := int32(mutexLocked - 1<<mutexWaiterShift)
????????????????if !starving || old>>mutexWaiterShift == 1 {
????????????????????// Exit starvation mode.
????????????????????// Critical to do it here and consider wait time.
????????????????????// Starvation mode is so inefficient,delta)
????????????awoke = true
????????????iter = 0
????????} else {
????????????old = m.state
注脚1? ? 这的runtime_SemacquireMutex是对上面说的sema.go中semacquire1的简单封装,里面最后会调用goPark让当前goroutine让出执行权限给runtime,同时设置当前goroutine为睡眠状态,不参与调度(表现在程序上,就是阻在那了)。
注脚2? ? 1)?这也分两种情况,如果没有超1ms,starving=false
? ? ? ? ? ? ? ? ? ? ?old = m.state? ? ? ? ? ? ? //当前肯定是unlock了,当前goroutine才被唤醒了,所以old至少为000,我们假定为000
? ? ? ? ? ? ? ? ? ? ?if old&mutexStarving != 0? ? //old不是starving模式,不进if
? ? ? ?
? ? ? ? ? ? ? ? ? ?awoke = true? ? //充置awoke和iter,重新走循环
? ? ? ? ? ? ? ? ? ? iter = 0
? ? ? ? ? ? ? ? ? ? ?///////////////////////////
? ? ? ? ? ? ? ? ? ? ?下次循环中,最后会设置new=001,当前goroutine被唤醒,加锁1,不是starving状态。
? ? ? ? ? ? ? ? ? ? ?最后会在下面这break,跳出Lock函数
????????if atomic.CompareAndSwapInt32(&m.state,new) {
????????????if old&(mutexLocked|mutexStarving) == 0 {
????????????????break // locked the mutex with CAS
? ? ? ? ? ? 2)如果超了1ms,straving = true
? ? ? ? ? ? ? ? ?old = m.state? ? ? ? ? ? ? //当前肯定是unlock了,当前goroutine才被唤醒了,所以old至少为000,我们假定为000
? ? ? ? ? ? ? ? ? ?if old&mutexStarving != 0? ? //old不是starving模式,不进if
? ? ? ? ? ? ? ? ? ?awoke = true? ? //充置awoke和iter,重新走循环
? ? ? ? ? ? ? ? ? ?iter = 0
? ? ? ? ? ? ? ? ? ?///////////////////////////
? ? ? ? ? ? ? ? ?下次循环 new=101, 锁处于starving模式,当前goroutine被唤醒,已加锁
? ? 二? 如果处于starving会有什么影响?主要提现在Unlock函数中
????// Fast path: drop lock bit.
? ? //先清掉lock位,假设最简单的情况,其它位都为0,则m.state=000,new=000
????new := atomic.AddInt32(&m.state,-mutexLocked)
????if (new+mutexLocked)&mutexLocked == 0 {
????????throw("sync: unlock of unlocked mutex")
? ? //这里就是starving模式的影响,如果处于starving模式,那么直接走else,从队列头部唤醒一个goroutine。
????if new&mutexStarving == 0 {
????????old := new? ? ? ? ? ? ? ? ? ?//old = 000
????????for {
????????????// If there are no waiters or a goroutine has already
????????????// been woken or grabbed the lock,no need to wake anyone.
????????????// In starvation mode ownership is directly handed off from unlocking
????????????// goroutine to the next waiter. We are not part of this chain,
????????????// since we did not observe mutexStarving when we unlocked the mutex above.
????????????// So get off the way.
?? ??? ?? ? //如果队列中没有等待的goroutine或者有goroutine已经被唤醒并且抢占了锁(这种情况就如lock中,正好处在cpu中的goroutine在自旋,正好在unlock后,马上抢占了锁),那么就不需要wake等待队列了。
????????????if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
?? ??? ?? ??
?? ??? ?? ? //如果队列中有等着的,并且也没有处在cpu中的goroutine去自旋获取锁,那么就抓住机会从等待队列中唤醒一个goroutine。
????????????// Grab the right to wake someone.
????????????new = (old - 1<<mutexWaiterShift) | mutexWoken
????????????if atomic.CompareAndSwapInt32(&m.state,new) {
????????????old = m.state
????} else {
????????// Starving mode: handoff mutex ownership to the next waiter.
????????// Note: mutexLocked is not set,the waiter will set it after wakeup.
????????// But mutex is still considered locked if mutexStarving is set,
????????// so new coming goroutines won‘t acquire it.
? ? ? ? //starving模式,直接从队列头取goroutine唤醒。上面lock函数中没有分析runtime_SemacquireMutex(&m.sema,queueLifo)阻塞被唤醒后,如果lock处于是starving模式,会怎么样,这里分析一下,注脚3
注脚3? 首先在unlock函数开头即使清了lock位,cpu中的goroutine也不能获取到锁(因为判断m.state的starving位是饥饿模式,只能队列中等待的goroutine取获取锁,所以cpu中的goroutine会进入等待队列),那么在unlock函数中runtime_Semrelease(&m.sema,true)时,会唤醒队列中一个睡眠的goroutine。
????????????runtime_SemacquireMutex(&m.sema,queueLifo)? ? //在这被唤醒
????????????starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
????????????old = m.state? ? ? ? ? ? //old = 100
????????????if old&mutexStarving != 0 {? ? ? ? ? //lock处于starving中
????????????????// If this goroutine was woken and mutex is in starvation mode,
????????????????// ownership was handed off to us but mutex is in somewhat
????????????????// inconsistent state: mutexLocked is not set and we are still
????????????????// accounted as waiter. Fix that.
????????????????if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
????????????????????throw("sync: inconsistent mutex state")
????????????????delta := int32(mutexLocked - 1<<mutexWaiterShift)? ? ? ? ? ? ? ? //先将当前等待队列减一个
????????????????if !starving || old>>mutexWaiterShift == 1 {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //如果当前队列空了,就把starving清0了
????????????????????// Exit starvation mode.
????????????????????// Critical to do it here and consider wait time.
????????????????????// Starvation mode is so inefficient,delta)? ? ? //加锁跳出
1? 一个全局int变量,多核中一个goroutine读,一个写,没有更多操作,需不需要做原子操作。
? ?应该是不需要加的,intel P6处理器在硬件层面上是支持32位变量的load和store的原子性的。另外编译器对于变量的读或写也不会编译成多条指令。
2? ?一个全局int变量i,对于多核,两个协程都同时执行i++,需要原子操作吗?
? ? 需要的,对于i++,是典型的读改写操作,对于这样的操作,需要CAS原子操作保证原子性。
3? 对于一个map,写加原子操作,读要不要加
? ? 如果只是读或者写,并且值类型是整形的,应该是不需要atomic原子操作的,这里的意思是对于整形,不会出现写一半,或者读一半的情况,但是不可避免的,会出现这种情况,goroutine1对map写入1,goroutine2读到1,在处理的过程中,goroutine1又重新赋值。


