上篇中,介绍了关于golang互斥锁的内容,了解了golang互斥锁基于runtime信号量的同步机制,以及runtime信号量的实现。本篇书接上回,继续聊一点读写锁的东西。同样的,不会纠结于某些细节,而是关注于一些我感兴趣的点,以及整体的设计。
说起读写锁,大家都知道写写互斥、读写互斥、读读并发。但golang的读写锁还有一个特点,即使当前读写锁被reader持有,但是当有writer在等待该锁时,新的reader也会被阻塞而不能持有该锁,这是为了防止writer的饥饿现象。同样的,当writer解锁时,也会优先唤醒阻塞在其上的reader,防止reader的饥饿现象。这是为了保证公平性。
首先看下sync.RWMutex的结构体。RWMutex具有5个字段。先简单介绍每个字段的含义。
- w为互斥锁,writer在RWMutex.Lock时首先要获取w,其作用为实现写写互斥。说的更仔细一点,w保证了只有一个writer能进入到获取RWMutex写权限的过程中,其他的写操作被阻塞在w上。进入获取RWMutex写权限的writer或者成功加锁,或者被读操作阻塞在writerSem上。
- writerSem为写信号量,被读锁阻塞的writer会挂在writerSem的阻塞队列中。当持有读锁的reader数归0后,会尝试唤醒writerSem的writer。同一时刻,阻塞在writerSem上的writer最多一个。
- readerSem为读信号量,被写锁阻塞的reader会挂在readerSem的阻塞队列中。当写操作执行完成时,会先唤醒阻塞在readerSem上的所有reader,再释放w。以此保证不会造成reader饥饿。
- readerCount和readerWait相对绕一点。这里粗糙地描述一下,细节留到后面代码环节介绍。readerWait表示当前正在执行的reader的数量,当其掉0后可以切换写锁;readerCount中含有阻塞的reader的数量,当写锁释放时会将readerCount个reader全部唤醒。
type RWMutex struct {w Mutex // held if there are pending writerswriterSem uint32 // semaphore for writers to wait for completing readersreaderSem uint32 // semaphore for readers to wait for completing writersreaderCount int32 // number of pending readersreaderWait int32 // number of departing readers
}
加写锁
获取写锁时,首先会获取互斥锁。上一小节中也提到过,这个操作保证了同时只有一个writer能够进入到获取RWMutex写权限的过程中,这是写写互斥的子集,当然能够保证写写互斥。
获取互斥锁后,会将readerCount字段减去rwmutexMaxReaders,这是唯一的readerCount小于0的情况,表示的是读写锁处于写锁状态或者有writer在等待写锁。这两种情况下,新来的reader都会被阻塞于readerSem上。
同时,会将readerCount的值赋给readerWait,这也是唯一的readerWait可能大于0的情况,表示的是当前持有写锁的reader的数量。如果readerWait大于0,则说明当前还有读请求在执行,writer挂起在writerSem上。
func (rw *RWMutex) Lock() {// First, resolve competition with other writers.rw.w.Lock()// Announce to readers there is a pending writer.r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders// Wait for active readers.if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {runtime_SemacquireMutex(&rw.writerSem, false, 0)}
}
加读锁
读写互斥是通过readerCount的正负来判断的。写权限请求会将readerCount减去极大值,使readerCount一直为负。所以获取读锁时,如果readerCount为负,则将reader挂起。但我们注意,此时虽然将reader挂起,但readerCount还是加1,说明此时readerCount中可能同时持有正在处理的reader的数量和阻塞的reader的数量。
func (rw *RWMutex) RLock() {if atomic.AddInt32(&rw.readerCount, 1) < 0 {// A writer is pending, wait for it.runtime_SemacquireMutex(&rw.readerSem, false, 0)}
}
解写锁
解写锁时,会首先将readerCount加上rwmutexMaxReaders,解除读写互斥。
上面我们提到,readerCount中可能同时持有正在处理的reader的数量和阻塞的reader的数量。当写锁被持有时,不可能存在正在处理的reader,所以此时readerCount表示的是被写锁阻塞的reader的数量。此时会依次将所有阻塞的reader全部唤醒。让读请求先处理,然后再释放互斥锁,允许写请求进入。这也是公平性的体现。
func (rw *RWMutex) Unlock() {// Announce to readers there is no active writer.r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)if r >= rwmutexMaxReaders {race.Enable()fatal("sync: Unlock of unlocked RWMutex")}// Unblock blocked readers, if any.for i := 0; i < int(r); i++ {runtime_Semrelease(&rw.readerSem, false, 0)}// Allow other writers to proceed.rw.w.Unlock()
}
解读锁
解读锁时,会将readerCount首先减1,将当前正在处理的reader的数量减少。
如果这时readerCount的值为负,上面提到过readerCount小于0的情况只有读写锁处于写锁状态或者有writer在等待写锁,在解读锁时不可能被持有写锁,所以只能是有writer在等待写锁。如果有writer在等待,则查看解读锁后是否还有reader在执行,如果没有,则唤醒阻塞的写锁。
func (rw *RWMutex) RUnlock() {if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {// Outlined slow-path to allow the fast-path to be inlinedrw.rUnlockSlow(r)}
}func (rw *RWMutex) rUnlockSlow(r int32) {if r+1 == 0 || r+1 == -rwmutexMaxReaders {race.Enable()fatal("sync: RUnlock of unlocked RWMutex")}// A writer is pending.if atomic.AddInt32(&rw.readerWait, -1) == 0 {// The last reader unblocks the writer.runtime_Semrelease(&rw.writerSem, false, 1)}
}
总结
互斥锁中,更多是介绍了偏低层的机制和实现。在本篇中,更多是着眼于读写锁的上层的设计。不得不说,读写锁的设计还是非常有意思的,理解其设计对更深刻地理解同步机制非常有帮助,同时,对我们程序中构建同步机制也会有所借鉴。