1.RWMutex常用方法
- Lock/Unlock
- RLock/RUnlock
- RLocker 为读操作返回一个Locker接 口的对象
2. RWMutex使用方法
func main() {
var counter Counter
for i := 0; i < 10; i++ { // 10个reader
go func() {
for {
counter.Count() // 计数器读操作
time.Sleep(time.Millisecond)
}
}()
}
for { // 一个writer
counter.Incr() // 计数器写操作
time.Sleep(time.Second)
}
}
// 一个线程安全的计数器
type Counter struct {
mu sync.RWMutex
count uint64
}
// 使用写锁保护
func (c *Counter) Incr() {
c.mu.Lock()
c.count++
c.mu.Unlock()
}
/ 使用读锁保护
func (c *Counter) Count() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.count
}
在读多写少的场景下,使用RWMutex性能要更好些
3.实现原理
RWMutex是基于Mutex实现的,以写优先来实现的
type RWMutex struct {
w Mutex // 互斥锁解决多个writer的竞争
writerSem uint32 // writer信号量
readerSem uint32 // reader信号量
readerCount int32 // reader的数量
readerWait int32 // writer等待完成的reader的数量(记录写锁来的时候前面还需要等待多少个读锁,其实也是在某一时刻 readerCount 的复制值)
}
const rwmutexMaxReaders = 1 << 30
说明:
- 这里的常量 rwmutexMaxReaders,定义了最大的 reader 数量。
- 字段 w:为 writer 的竞争锁而设计;
- 字段 readerCount:记录当前 reader 的数量(以及是否有 writer 竞争锁);
- readerWait:记录 writer 请求锁时需要等待 read 完成的 reader 的数量;
- writerSem 和 readerSem:都是为了阻塞设计的信号量。
3.1 RLock/RUnlock的实现
func (rw *RWMutex) RLock() {
// 每次goroutine获得读锁,readerCount+1
// 这里分两种情况:
// 1. 当判断大于等于0, 证明当前没有写锁, 那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行])
// 2. 当判断小于0, 证明当前有写锁(Lock时会readerCount-rwmutexMaxReaders, 因此会小于0), 所以通过readerSem读信号量, 使读操作睡眠等待
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// rw.readerCount是负值的时候,意味着此时有writer等待请求锁,因为writer优先
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
// 这里分两种情况:
// 释放读锁, readerCount减1
// 1.若readerCount大于0, 证明当前还有读锁, 直接结束本次操作
// 2.若readerCount小于等于0, 证明已经没有读锁, 可以唤醒写锁(若有)
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
rw.rUnlockSlow(r) // 有等待的writer
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
// 1. 本来就没读锁, 调用RUnlock就发生panic
// 2. 超过读锁的最大限制就发生panic
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// readerWait--操作,如果readerWait--操作之后的值为0,说明,写锁之前,已经没有读锁了
// 通过writerSem信号量,唤醒队列中第一个阻塞的写锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 最后一个reader了,writer终于有机会获得锁了
runtime_Semrelease(&rw.writerSem, false, 1)
}
3.2 Lock的实现
func (rw *RWMutex) Lock() {
// 首先解决其他writer竞争问题
rw.w.Lock()
//// 先readerCount-rwmutexMaxReaders<0,标识有写锁来了,让后续来的读锁堵塞无法拿到锁
// 再加回rwmutexMaxReaders得到当前读锁的个数
// 反转readerCount,告诉reader有writer竞争锁
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxRead
// 如果当前有reader持有锁,那么需要等待
//// 读锁个数不为0的时候,写锁阻塞,把当前的读锁个数readerCount赋值给readerWait
// 用于标识写锁前面还需要等待多少个读锁
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
3.3 Unlock的实现
func (rw *RWMutex) Unlock() {
// 告诉reader没有活跃的writer了
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
// 唤醒阻塞的reader们
for i := 0; i < int(r); i++ {
runtime
_
Semrelease(&rw.readerSem, false, 0)
}
// 释放内部的互斥锁
rw.w.Unlock()
}
4 使用RWMutex的注意点
- 不可复制
- 重入导致死锁
- 释放未加锁的RWMutex,会产生panic