sync.Cond 基本概念

sync.Cond可以称其为条件变量,主要作用是让一组goroutine等待某个特定条件出现,并在条件满足时发送通知。

sync.Cond可以通知其它goroutine你们可以被唤醒了。也可以主动阻塞让出CPU资源。

sync.Cond本质上是对mutex或者RWMutex的一种扩展。其底层必须维护着一个Locker接口的对象。

每当我们的业务满足某种条件时,我们可以通知其它goroutine你们可以开始工作了,不用阻塞了。

sync.Cond底层实现

type Cond struct {
	noCopy noCopy

	// L is held while observing or changing the condition
	L Locker

	notify  notifyList //通知队列
	checker copyChecker
}

sync.Cond主要包含3个方法,和Locker接口定义的两个方法(组合嵌套)。

notifyList

notifyList是实现sync.Cond核心数据结构。其本身是一个队列,主要存储等待的goroutinenotifyList 的设计目标是:

  • 记录哪些协程在等待。
  • 保证 Signal 唤醒的是等待时间最长的那个(FIFO)。
  • 允许 Wait 操作在不持有全局大锁的情况下进行一部分逻辑,减少竞争。
type notifyList struct {
	// wait is the ticket number of the next waiter. It is atomically
	// incremented outside the lock.
	wait atomic.Uint32 // 下一个等待着应该持有ticket

	// notify is the ticket number of the next waiter to be notified. It can
	// be read outside the lock, but is only written to with lock held.
	//
	// Both wait & notify can wrap around, and such cases will be correctly
	// handled as long as their "unwrapped" difference is bounded by 2^31.
	// For this not to be the case, we'd need to have 2^31+ goroutines
	// blocked on the same condvar, which is currently not possible.
	notify uint32 //下一个被唤醒的goroutine的ticket

	// List of parked waiters.
	lock mutex // 保护链表的锁
	head *sudog // 链表的头部
	tail *sudog //链表的尾部
}

wait (Ticket):每当一个协程调用 Wait(),它就会领到一个自增的编号。

notify (Waiter):记录当前已经通知到了哪个编号。

sudog:这是 Go 运行时包装 Goroutine 的结构,用来把它挂在等待队列里。

notifyListAdd

notifyListAdd函数主要作用是将当前调用者(当前goroutine)添加到等待通知的队列当中。如果调用了notifyListAdd那么之后就必须调用notifyListWait函数等待通知。

func notifyListAdd(l *notifyList) uint32 {
	// This may be called concurrently, for example, when called from
	// sync.Cond.Wait while holding a RWMutex in read mode.
	return l.wait.Add(1) - 1
}

可以看到notifyListAdd会返回一个ticket,然后使用ticket传入notifyListWait中。此时真正的阻塞等待通知唤醒了。

notifyListWait

notifyListWait是真正意义上等待被唤醒。其主要是获取当前goroutine也就是sudog,然后将该sudog放入等待通知队列当中,然后休眠,等待被唤醒。

// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
	lockWithRank(&l.lock, lockRankNotifyList)

	// Return right away if this ticket has already been notified.
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// Enqueue itself.
	s := acquireSudog()
	s.g = getg()
	s.ticket = t
	s.releasetime = 0
	t0 := int64(0)
	if blockprofilerate > 0 {
		t0 = cputicks()
		s.releasetime = -1
	}
	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)
	if t0 != 0 {
		blockevent(s.releasetime-t0, 2)
	}
	releaseSudog(s)
}

notifyListNotifyAll

将所有等待通知队列的goroutine全部唤醒。

// notifyListNotifyAll notifies all entries in the list.
//
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
	// Fast-path: if there are no new waiters since the last notification
	// we don't need to acquire the lock.
	if l.wait.Load() == atomic.Load(&l.notify) {
		return
	}

	// Pull the list out into a local variable, waiters will be readied
	// outside the lock.
	lockWithRank(&l.lock, lockRankNotifyList)
	s := l.head
	l.head = nil
	l.tail = nil

	// Update the next ticket to be notified. We can set it to the current
	// value of wait because any previous waiters are already in the list
	// or will notice that they have already been notified when trying to
	// add themselves to the list.
	atomic.Store(&l.notify, l.wait.Load())
	unlock(&l.lock)

	// Go through the local list and ready all waiters.
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

notifyListNotifyOne

从等待通知队列中获取一个goroutine然后将其唤醒。

// notifyListNotifyOne notifies one entry in the list.
//
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
	// Fast-path: if there are no new waiters since the last notification
	// we don't need to acquire the lock at all.
	if l.wait.Load() == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)

	// Re-check under the lock if we need to do anything.
	t := l.notify
	if t == l.wait.Load() {
		unlock(&l.lock)
		return
	}

	// Update the next notify ticket number.
	atomic.Store(&l.notify, t+1)

	// Try to find the g that needs to be notified.
	// If it hasn't made it to the list yet we won't find it,
	// but it won't park itself once it sees the new notify number.
	//
	// This scan looks linear but essentially always stops quickly.
	// Because g's queue separately from taking numbers,
	// there may be minor reorderings in the list, but we
	// expect the g we're looking for to be near the front.
	// The g has others in front of it on the list only to the
	// extent that it lost the race, so the iteration will not
	// be too long. This applies even when the g is missing:
	// it hasn't yet gotten to sleep and has lost the race to
	// the (few) other g's that we find on the list.
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		if s.ticket == t {
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
			readyWithTime(s, 4)
			return
		}
	}
	unlock(&l.lock)
}

从链表中找到编号等于 notify 的那个协程。

notify 加 1。

唤醒该协程。

Wait

Wait会主动阻塞当前goroutine

func (c *Cond) Wait() {
	c.checker.check()
	t := runtime_notifyListAdd(&c.notify) // 先加入通知列表中
	c.L.Unlock()
	runtime_notifyListWait(&c.notify, t) // 等待通知
	c.L.Lock()
}

根据Wait源码我们可以知道,调用Wait会**先释放锁,然后被唤醒的时候获得锁,如果在释放之后,与其它goroutine竞争锁的时候,没有获得锁的时候会被阻塞,直到拿到锁。 **

这里需要注意,wait的时候会先释放锁,所以在执行wait之前不要先释放锁,否则会引发panic

Signal

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
//
// Signal() does not affect goroutine scheduling priority; if other goroutines
// are attempting to lock c.L, they may be awoken before a "waiting" goroutine.
func (c *Cond) Signal() {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}

当满足某些条件时,唤醒等待通知队列中的一个goroutine

BroadCast

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}

唤醒所有等待通知的goroutine

使用sync.Cond陷阱

  • 在调用Wait之前没有加锁:根据Wait方法,我们知道,其首先会释放锁,然后才等待被唤醒。如果我们在调用Wait之前没有加锁,那么就会产生panic,对没有加锁的锁进行释放锁。

  • 在被唤醒的时候没有检查条件:首先当前goroutine被唤醒的时候,条件不一定满足,如果条件不满足就执行接下来的代码就会发生逻辑错误。所以最好的选择是,被唤醒的时候使用for循环判断条件是否满足,如果满足则执行接下里业务逻辑。

让goroutine顺序执行

比如我们有三个goroutine,这三个goroutine分别打印100次cat ,dog, pig。我们需要这三个goroutine按照cat dog pig的顺序交互打印。

根据上述需求,我们可以使用sync.Cond或者chanel实现。

使用sync.Cond实现

思路: 我们需要定义一个类型,然后判断这个类型是否是否和我们需要的类型,如果是我才能打印数据,如果不是当前goroutine阻塞,直到满足条件时,我们在打印。

type Display struct {
	condition *sync.Cond
	ttype     int // 0 cat 1 dog 2 pig
	times     int
}

func TestSequenceDispaly(t *testing.T) {

	display := Display{}
	var mutex sync.Mutex
	display.condition = sync.NewCond(&mutex)
	display.times = 100
	var wg sync.WaitGroup
	wg.Add(3)
	go display.Cat(&wg)
	go display.Dog(&wg)
	go display.Pig(&wg)
	wg.Wait()
}

func (d *Display) Cat(wg *sync.WaitGroup) {
	defer wg.Done()

	for i := 0; i < d.times; {
		d.condition.L.Lock() //加互斥锁

		// 这里使用for循环判断条件是否满足,如果不满足就阻塞,不会执行for语句块下方的代码,让出了CPU资源
		// 因为我们的逻辑是,唤醒所有等待的goroutine,唤醒的时候并不一定满足条件
		// 如果当前goroutine被唤醒,但是不满足条件,我继续阻塞,直到满足条件的时候,才执行接下来的指令
		for d.ttype != 0 {
			d.condition.Wait()
		}

		// 如果满足条件
		fmt.Println("cat")
		d.ttype = 1 // 将条件变为1 等待Dog协程打印
		i++

		d.condition.L.Unlock()  // 释放锁
		d.condition.Broadcast() //通知其它goroutine 可以唤醒了,不需要阻塞了
	}
}

func (d *Display) Dog(wg *sync.WaitGroup) {
	defer wg.Done()

	for i := 0; i < d.times; {
		d.condition.L.Lock()

		for d.ttype != 1 {
			d.condition.Wait()
		}

		fmt.Println("dog")
		d.ttype = 2
		i++

		d.condition.L.Unlock()
		d.condition.Broadcast()
	}

}

func (d *Display) Pig(wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < d.times; {
		d.condition.L.Lock()

		for d.ttype != 2 {
			d.condition.Wait()
		}

		fmt.Println("pig")
		d.ttype = 0
		i++

		d.condition.L.Unlock()
		d.condition.Broadcast()
	}

}

使用chanel实现

可以使用无缓冲chanel,主要思想就是在一个goroutine中唤醒特定的goroutine,而不是将所有阻塞的goroutine都被唤醒。

func TestSequenceDisplayByChanel(t *testing.T) {

	c := make(chan struct{})
	d := make(chan struct{})
	p := make(chan struct{})
	end := make(chan struct{})

	go cat(c, d)
	go dog(d, p)
	go pig(p, c, end)

	c <- struct{}{}
	<-end

}

func cat(start, next chan struct{}) {
	for i := 0; i < 100; i++ {
		<-start // 如果start没有数据则会被阻塞
		fmt.Println("cat")
		next <- struct{}{}
	}
}

func dog(start, next chan struct{}) {
	for i := 0; i < 100; i++ {
		<-start 
		fmt.Println("dog")
		next <- struct{}{}
	}
}

func pig(start, next, end chan struct{}) {
	for i := 0; i < 100; i++ {
		<-start 

		if i == 99 {
			fmt.Println("pig")
			end <- struct{}{}
		} else {
			next <- struct{}{}

		}
	}
}

sync.Cond使用模板

cond.L.Lock()
for !condition{
    cond.Wait()
}

// 业务逻辑
cond.L.Unlock()

sync.Cond使用场景

  • 一呼百应:当某种状态改变了,然后需要通知其它所有等待协程时。比如当一个协程改变了全局配置,那么就需要通知其它协程,刷新自己的配置。
  • 频繁的状态切换:如果状态一直在满足和不满足之间切换,那么就可以使用sync.Cond
  • 高性能缓存/队列:在使用自建的缓冲队列或者对象池时,可以使用sync.Cond

使用sync.Cond基本上可以使用chanel来实现,所以很多大型开源项目和标准库基本上是不使用sync.Cond,如果非要使用需要权衡sync.Cond和chanel实现的性能。