ReentrantReadWriteLock源码解析
概述
ReentrantReadWriteLock是Lock的另一种实现方式,我们已经知道了ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性。在实际应用中,大部分情况下对共享数据(如缓存)的访问都是读操作远多于写操作,这时ReentrantReadWriteLock能够提供比排他锁更好的并发性和吞吐量。
读写锁内部维护了两个锁,一个用于读操作,一个用于写操作。所有 ReadWriteLock实现都必须保证 writeLock操作的内存同步效果也要保持与相关 readLock的联系。也就是说,成功获取读锁的线程会看到写入锁之前版本所做的所有更新。
ReentrantReadWriteLock支持以下功能:
1)支持公平和非公平的获取锁的方式;
2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
4)读取锁和写入锁都支持锁获取期间的中断;
5)Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。
示例(JDK自带示例)
示例1:利用重入来执行升级缓存后的锁降级
class CachedData {
Object data;
volatile boolean cacheValid; //缓存是否有效
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock(); //获取读锁
//如果缓存无效,更新cache;否则直接使用data
if (!cacheValid) {
// Must release read lock before acquiring write lock
//获取写锁前须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
//锁降级,在释放写锁前获取读锁
rwl.readLock().lock();
rwl.writeLock().unlock(); // Unlock write, still hold read
}
use(data);
rwl.readLock().unlock(); //释放读锁
}
}
示例2:使用 ReentrantReadWriteLock 来提高 Collection 的并发性
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock(); //读锁
private final Lock w = rwl.writeLock(); //写锁
public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
源码解析
ReentrantReadWriteLock 是基于AQS实现的(参考AbstractQueuedSynchronizer源码解析),它的自定义同步器(继承AQS)需要在同步状态(一个整型变量state)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {} { /** Inner class providing readlock */private final ReentrantReadWriteLock.ReadLock readerLock;/** Inner class providing writelock */private final ReentrantReadWriteLock.WriteLock writerLock;/** Performs all synchronization mechanics */final Sync sync; }ReadLock与WriteLock
public static class ReadLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -5992448646407690164L;private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() {sync.acquireShared(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean tryLock() {return sync.tryReadLock();}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public void unlock() {sync.releaseShared(1);}public Condition newCondition() {throw new UnsupportedOperationException();}} public static class WriteLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -4992448646407690164L;private final Sync sync;protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() {sync.acquire(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock( ) {return sync.tryWriteLock();}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}public void unlock() {sync.release(1);}public Condition newCondition() {return sync.newCondition();}public boolean isHeldByCurrentThread() {return sync.isHeldExclusively();}public int getHoldCount() {return sync.getWriteHoldCount();}}ReadLock与WriteLock比较:
- ReadLock使用共享模式,WriteLock使用独占模式
- ReadLock与WriteLock使用同一个Sync
Sync
数据结构
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
SHARED_SHIFT相关变量与方法为锁状态访问逻辑。
*/
//低16位为写锁状态,高16位为读锁状态。
static final int SHARED_SHIFT = 16;
//读锁每次增加的单位(因为在高16位,所以加1,即相当于整个int加 1个SHARED_UNIT )
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//读锁最大数(2^16 -1 )
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//写锁状态掩码(2^16-1)
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**返回读锁数量 ,高16位(无符号右移16位) */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写锁数量
*EXCLUSIVE_MASK,高位全是0,与操作返回写状态数量
*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
//当前读线程的计数器
static final class HoldCounter {
int count = 0; //当前读线程总的重入次数。
//线程ID
final long tid = getThreadId(Thread.currentThread());
}
/**
* ThreadLocal 子类
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
*/
private transient ThreadLocalHoldCounter readHolds;
//当前线程缓存的HoldCounter ,
private transient HoldCounter cachedHoldCounter;
/**
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
}
读锁的获取与释放
读锁的lock和unlock的实际实现对应Sync的 tryAcquireShared 和 tryReleaseShared方法。
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();如果写锁线程数 != 0 ,且独占锁不是当前线程则返回失败,因为存在锁降级(先write,后read),if (exclusiveCount(c) != 0 && //获取写锁数量getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c); //获取读锁数量。if (!readerShouldBlock() && //读操作不被阻塞。r < MAX_COUNT && //读锁数量未到最大值compareAndSetState(c, c + SHARED_UNIT) //竞争成功) {//r == 0,表示第一个读锁线程,第一个读锁firstRead是不会加入到readHolds中if (r == 0) {firstReader = current; // 设置第一个读线程firstReaderHoldCount = 1; // 读线程占用的资源数为1} else if (firstReader == current) {// 当前线程为第一个读线程,表示第一个读锁线程重入,则第一个线程占用资源+1firstReaderHoldCount++;} else {// 获取计数器HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))// 计数器为空或者计数器的tid不为当前正在运行的线程的tid,则获取当前 线程对应的计数器cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)//计数为0,则表示未加入到缓存,加入到readHolds中readHolds.set(rh);//把当前线程的计数加1rh.count++;}//返回1,则表示acquire成功,return 1;}return fullTryAcquireShared(current);}final int fullTryAcquireShared(Thread current) {/** This code is in part redundant with that in* tryAcquireShared but is simpler overall by not* complicating tryAcquireShared with interactions between* retries and lazily reading hold counts.*/HoldCounter rh = null;for (;;) {int c = getState();if (exclusiveCount(c) != 0) {//写线程不是本线程,则返回-1if (getExclusiveOwnerThread() != current)return -1;// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) { // 写线程数量为0并且读线程被阻塞// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) { //第一个读线程是本线程。// assert firstReaderHoldCount > 0;} else {//第一个读线程不是本线程。if (rh == null) {//获取缓存holdrh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {//获取本线程的Holdrh = readHolds.get();if (rh.count == 0)readHolds.remove(); //如果为0,则需要移出。}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异常throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) { // 比较并且设置成功if (sharedCount(c) == 0) { // 读线程数量为0,设置第一个读线程firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {//第一个读线程计数加1.firstReaderHoldCount++;} else {//不是第一个线程,则获取计数器。if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}读锁acquire流程图
注意:
更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数(23行至43行代码),这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数。
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}
读锁的释放工作,主要是包括更新计数器的值(重入次数),以及更新state(总的读锁重入次数)
计数器:HoldCounter
在读锁的获取、释放过程中,总是会有一个对象存在着,同时该对象在获取线程获取读锁是+1,释放读锁时-1,该对象就是HoldCounter。
要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。
static final class HoldCounter {int count = 0;final long tid = Thread.currentThread().getId(); }static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();} }private transient ThreadLocalHoldCounter readHolds;private transient HoldCounter cachedHoldCounter;在HoldCounter中仅有count和tid两个变量,其中count代表着计数器,tid是线程的id。但是如果要将一个对象和线程绑定起来仅记录tid肯定不够的,而且HoldCounter根本不能起到绑定对象的作用,只是记录线程tid而已。
诚然,在java中,我们知道如果要将一个线程和对象绑定在一起只有ThreadLocal才能实现 。ThreadLocalHoldCounter继承ThreadLocal,并且重写了initialValue方法。
故而,HoldCounter应该就是绑定线程上的一个计数器,而ThradLocalHoldCounter则是线程绑定的ThreadLocal。从上面我们可以看到ThreadLocal将HoldCounter绑定到当前线程上,同时HoldCounter也持有线程Id,这样在释放锁的时候才能知道ReadWriteLock里面缓存的上一个读取线程(cachedHoldCounter)是否是当前线程。这样做的好处是可以减少ThreadLocal.get()的次数,因为这也是一个耗时操作。需要说明的是这样HoldCounter绑定线程id而不绑定线程对象的原因是避免HoldCounter和ThreadLocal互相绑定而GC难以释放它们(尽管GC能够智能的发现这种引用而回收它们,但是这需要一定的代价),所以其实这样做只是为了帮助GC快速回收对象而已。
写锁的获取与释放
写锁的lock和unlock调用的独占式同步状态的获取与释放,因此真实的实现就是Sync的 tryAcquire和 tryRelease。
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();//写线程数量(即获取独占锁的重入数)int w = exclusiveCount(c);//当前同步状态state != 0,说明已经有其他线程获取了读锁或写锁if (c != 0) {// 当前state不为0,此时:如果写锁状态为0说明读锁此时被占用返回false;// 如果写锁状态不为0且写锁没有被当前线程持有返回falseif (w == 0 // w==0 ,说明写锁为0,c!=0 说明读锁正在操作||current != getExclusiveOwnerThread() // w != 0 说明有写锁在操作,但是不是本线程,则返回false。)return false;//判断同一线程获取写锁是否超过最大次数(65535),支持可重入if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");//此时当前线程已持有写锁,现在是重入,所以只需要修改锁的数量即可。setState(c + acquires);return true;}//到这里说明此时c=0,读锁和写锁都没有被获取if (writerShouldBlock() //判断写是否阻塞||!compareAndSetState(c, c + acquires) //竞争失败,)return false;//设置写锁为当前线程所有setExclusiveOwnerThread(current);return true; }更新写锁重入次数以及state
protected final boolean tryRelease(int releases) {//若锁的持有者不是当前线程,抛出异常if (!isHeldExclusively())throw new IllegalMonitorStateException();//写锁的新线程数int nextc = getState() - releases;//如果独占模式重入数为0了,说明独占模式被释放boolean free = exclusiveCount(nextc) == 0;if (free)//若写锁的新线程数为0,则将锁的持有者设置为nullsetExclusiveOwnerThread(null);//设置写锁的新线程数//不管独占模式是否被释放,更新独占重入数setState(nextc);return free; }公平锁与非公平锁
NonfairSync
static final class NonfairSync extends Sync {private static final long serialVersionUID = -8159625535654395037L;final boolean writerShouldBlock() {return false; // writers can always barge}final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}}FairSync
static final class FairSync extends Sync {private static final long serialVersionUID = -2274990926593161451L;final boolean writerShouldBlock() {return hasQueuedPredecessors();}final boolean readerShouldBlock() {return hasQueuedPredecessors();}}NonFairSync与FairSync的区别:writerShouldBlock()与readerShouldBlock()方法的实现不同
readerShouldBlock
- NonfairSync.readerShouldBlock
调用:apparentlyFirstQueuedIsExclusive,如果队列中第一个节点是独占式,则返回true,堵塞读锁。
//判断是否:队列中第一个等待节点是独占模式的final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null && //head节点不为空。(s = h.next) != null && //head的后置节点不为空!s.isShared() && //head的后置节点不是共享模式s.thread != null; //head的后置节点关联了线程。}调用apparentlyFirstQueuedIsExclusive方法, 为了防止写线程饥饿等待,如果同步队列中的第一个线程是以独占模式获取锁(写锁),那么当前获取读锁的线程需要阻塞,让队列中的第一个线程先执行。
- FairSync.readerShouldBlock
调用:hasQueuedPredecessors,判断是否是等待队列中是否有前置节点,有则返回true
writerShouldBlock
- NonfairSync.writerShouldBlock
直接返回false,说明非公平锁不堵塞写锁。
- FairSync.writerShouldBlock
调用:hasQueuedPredecessors,判断是否是等待队列中是否有前置节点,有则返回true
超强干货来袭 云风专访:近40年码龄,通宵达旦的技术人生
总结
以上是生活随笔为你收集整理的ReentrantReadWriteLock源码解析的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: ReentrantLock源码
- 下一篇: LinkedBlockingQueue源