欢迎访问 生活随笔!

生活随笔

当前位置: 首页 >

AbstractQueuedSynchronizer源码解析

发布时间:2024/4/13 62 豆豆
生活随笔 收集整理的这篇文章主要介绍了 AbstractQueuedSynchronizer源码解析 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

目录

 

关于AbstractQueuedSynchronizer

基本数据结构

节点结构

同步队列结构

实现

子类需要实现的方法

独占模式实现

独占模式同步队列示意

共享模式

共享模式同步队列示意:


关于AbstractQueuedSynchronizer

JDK1.5之后引入了并发包java.util.concurrent,里面包含了很多并发控制锁类,其核心是:AbstractQueuedSynchronizer,其数据结构为链表方式的双向队列。

基本数据结构

节点结构

链表节点的字段含义如下:

Class:Node
字段类型初始值意义
SHAREDfinal Node任意一个Node对象一个指示器,用于标识Node处于共享模式。
EXCLUSIVEfinal Nodenull一个指示器,用于标识Node处于独占模式。
CANCELLEDfinal int1waitStatus值,表示Node处于取消状态。一般当Node处于超时或者中断,设置此值。取消节点关联的线程不会重新阻塞。
SIGNALfinal int-1waitStatus值,表示Node的后续Node 已经或者即将通过park 阻塞。当此节点取消或者release时,后续节点需要unpark。为了避免竞争。acquire方式必须指示需要SIGNAL,重试acquire,在失败的情况下阻塞。
CONDITIONfinal int-2waitStatus值,表示节点处于等待队列。当在某个时间点set to 0 ,用于同步队列
PROPAGATEfinal int-3waitStatus值,用于共享模式,表示下一次acquire无条件的传播。
waitStatusvolatile int0节点状态
prevvolatile Nodenull前置节点
nextvolatile Nodenull后续节点
threadvolatile Threadnull关联线程
nextWaiterNodenull指向下一个Node is waiting on condition。或者指向SHARED,EXCLUSIVE表示模式。

同步队列结构

同步队列
字段类型初始值意义
headvolatile Nodenull同步队列头节点,延迟初始化,必须通过setHead方法设置值。
tailvolatile Nodenull同步队列尾节点,延迟初始化,通过enq方法设置。
statevolatile int0状态。用于追踪同步状态,具体由各子类处理。

 

  • 每次加入节点到同步队列,都加在尾部,释放节点从头节点的后续节点释放。
  • 一个节点位于队列头,并不保证acquire成功,只是尽量尝试acquire。
  •  

     

     

    实现

    AbstractQueuedSynchronizer仅实现抽象方法控制并发,由子类实现具体的资源控制。

    子类需要实现的方法

    方法意义
    tryAcquire尝试独占模式下acquire,失败则进入同步队列。
    tryRelease尝试独占模式下release。
    tryAcquireShared尝试共享模式下acquire,失败则进入同步队列。
    tryReleaseShared尝试共享模式下release。
    isHeldExclusively指示同步器是否在独占模式下被当前线程占用

    独占模式实现

    /**/public final void acquire(int arg) {if (!tryAcquire(arg) && --acquire失败,/* 第一尝试acquire失败后,加入到同步队列,会继续尝试*/acquireQueued(/* acquire失败,则加入一个节点到同步队列,节点为独占模式。*/addWaiter(Node.EXCLUSIVE), arg))/*acquireQueued 返回的是中断标识(中断标识以清除),如果为true ,重新设置中断标识*/selfInterrupt();}

       

    /* 节点加入同步队列之后,会尝试acquire。如果失败会通过park阻塞。 */ final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {//中断标志,标识此线程是否设置过中断标识。boolean interrupted = false;for (;;) {final Node p = node.predecessor(); //如果前一个节点是head节点,(head节点不关联线程),即本节点是第一个acquire失败的节点,则尝试acquire。if (p == head && tryAcquire(arg)) {//acquire成功,则把此节点设置为head节点(thread关联解除),返回中断标识。//【10】setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//不是第一个accquire失败的节点,则判断是否通过park阻塞。if (//判断是否需要park。shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//此线程 曾经被中断过。(中断标识被清除了)。interrupted = true;}} finally {//仅当出现异常时,才会进入此代码块,一般timeout或者中断,此时需取消节点。if (failed)cancelAcquire(node);}} /* 判断是否需要park */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//前一个节点的status为SIGNAL,表示后续节点需要parkif (ws == Node.SIGNAL) //【4】return true;if (ws > 0) {//ws > 0 ,仅当status=CANCELLED。则把取消节点剔除同步队列。do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/*状态为0或者PROPAGATE,独占模式为0,共享模式为PROPAGATE。则需要设置为SIGNAL,表示即将park(是否park由下一次acquire决定)。*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //【3】}return false;} private final boolean parkAndCheckInterrupt() {LockSupport.park(this); //【5】/**********此处时重点,以上代码执行后,线程会立即挂起。当线程unpark后,后续代码接着运行。*///返回线程的中断标识。同时清除中断标识。return Thread.interrupted();}

     

    /*取消acquire*/private void cancelAcquire(Node node) {// 节点不存在,返回。if (node == null)return;//取消线程关联。node.thread = null;// 跳过前置节点中的CANCELLED节点Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;Node predNext = pred.next;//设置状态为CANCELLEDnode.waitStatus = Node.CANCELLED;// 如果是尾节点,仅释放自身。if (node == tail && compareAndSetTail(node, pred)) {//设置前置节点的next为null。compareAndSetNext(pred, predNext, null);} else {// If successor needs signal, try to set pred's next-link// so it will get one. Otherwise wake it up to propagate.int ws;if (pred != head && //前置节点不为head。((ws = pred.waitStatus) == Node.SIGNAL //前置节点为CANCELLED||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)) //前置节点为0,并且设置为CANCELLED成功) &&pred.thread != null //关联线程) {//以上判断条件,表示前置节点不为head节点,并且为SIGNALNode next = node.next;if (next != null && next.waitStatus <= 0)//如果本节点之后仍有后续节点,则剔除本节点,修改指针。//此处未处理node.next.prev,node节点通过next是剔除的,通过prev是可以访问得到的。compareAndSetNext(pred, predNext, next);} else {unparkSuccessor(node);}node.next = node; // help GC}}

     

    private Node addWaiter(Node mode) {/* 构造一个节点,关联到当前线程*/Node node = new Node(Thread.currentThread(), mode);// 为了提高性能,先尝试一次加入同步队列。失败再尝试enq方式。enq方式时自旋方式(即死循环)Node pred = tail;if (pred != null) { node.prev = pred;//CASif (compareAndSetTail(pred, node)) { //【6】pred.next = node;return node;}}enq(node);return node;} /* 节点加入同步队列,并返回此节点。 采用自旋方式加入, */private Node enq(final Node node) {//自旋for (;;) {Node t = tail;//tail为null,则head必定为null,生成一个node作为head。if (t == null) { // Must initializeif (compareAndSetHead(new Node())) //【1】tail = head;} else {//加入节点成尾节点,并返回。node.prev = t;if (compareAndSetTail(t, node)) { //【2】t.next = node;return t;}}}} public final boolean release(int arg) {if (tryRelease(arg)) { //【7】//尝试成功Node h = head;if (h != null && h.waitStatus != 0) //head节点不为null,并且head状态不为0(在独占模式下,不为0,也不可能为CANCELLED,则为SIGNAL),则unpark后续节点。unparkSuccessor(h); //【8】return true;}return false;} private void unparkSuccessor(Node node) {/*status < 0 ,则把status设置为0*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {//后续节点为null,或者状态为CANCELLED。s = null;//从tail往前查找到第一个status<0的节点,选中作为要unpark的节点for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//unpark节点对应的线程。LockSupport.unpark(s.thread); //【9】}

    独占模式同步队列示意

    public class AbstractQueuedSynchronizerTest {@Testpublic void testAbstractQueuedSynchronizer() {Lock lock = new ReentrantLock();Runnable runnable0 = new ReentrantLockThread(lock);Thread thread0 = new Thread(runnable0);thread0.setName("t-0");Runnable runnable1 = new ReentrantLockThread(lock);Thread thread1 = new Thread(runnable1);thread1.setName("t-1");Runnable runnable2 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable2);thread2.setName("t-2");Runnable runnable3 = new ReentrantLockThread(lock);Thread thread2 = new Thread(runnable3);thread3.setName("t-3");Runnable runnable4 = new ReentrantLockThread(lock);Thread thread4 = new Thread(runnable4);thread4.setName("t-4");thread0.start();thread1.start();thread2.start();thread3.start();thread4.start();thread2.interrupt();for (;;);}private class ReentrantLockThread implements Runnable {private Lock lock;public ReentrantLockThread(Lock lock) {this.lock = lock;}@Overridepublic void run() {try {lock.lock();for (int i=0;i<1000000;i++);} finally {lock.unlock();}}}}

    以前假设各线程按顺序启动

    队列变化如下:

    1、同步队列初始化。thread:t-0 acquire成功。

    2、thread:t-1 启动,执行代码【1】,【2】,【3】,【4】处后状态如下图,然后执行【5】阻塞。

    3、thread:t-2 启动,执行代码【6】,【3】,【4】处后状态如下图,然后执行【5】阻塞。

    3、thread:t-3 启动,执行代码【6】,【3】,【4】处后状态如下图,然后执行【5】阻塞。

    4、thread:t-0,释放,执行代码【7】,【8】,经过【9】后,t-1 线程在代码【5】继续执行。经过【10】后状态如下图

    5、thread,t-2,t-3,t-4类似

    共享模式

    public final void acquireShared(int arg) {//独占模式:tryAcquire,返回boolean表示是否成功。//共享模式:tryAcquireShared,返回int,小于0,表示acquire失败。if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);// nextWaiter为SHARED,表示共享模式。【11】boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg); //【14】if (r >= 0) {//如果r>=0表示,表示许可有剩余。设置head,并继续传播setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted) selfInterrupt(); //与acquire最后2行代码一样,如果中断过,仍设置中断标识。failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}} private void setHeadAndPropagate(Node node, int propagate) {Node h = head; //指向原来的head,后面使用。setHead(node);//设置head,【15】if (propagate > 0 // 许可有剩余 【16】|| h == null // 原有head为null,表示节点已释放|| h.waitStatus < 0 // 状态为PROPAGATE或SIGNAL||(h = head) == null // 新的head为null|| h.waitStatus < 0 //或者新的head的状态<0) {Node s = node.next; //当前节点的后续节点if (s == null || s.isShared()) //当前节点为共享模式或者后续节点为nulldoReleaseShared(); //【17】}} public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) { //头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //【12】continue; // loop to recheck casesunparkSuccessor(h);//CAS成功,则...【13】}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}

    共享模式同步队列示意:

    package main.java.study;import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CountDownLatch;public class CountDownLatchTest {public class MapOper implements Runnable {CountDownLatch latch ;public MapOper(CountDownLatch latch) {this.latch = latch;}public void run() {try {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(Thread.currentThread().getName() + "start:" + df.format(new Date()));latch.await();System.out.println(Thread.currentThread().getName() + "work:" + df.format(new Date()));} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}System.out.println(Thread.currentThread().getName()+" Sync Started!");}}public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stubCountDownLatchTest test = new CountDownLatchTest();CountDownLatch latch = new CountDownLatch(1);Thread t1 = new Thread(test.new MapOper(latch));Thread t2 = new Thread(test.new MapOper(latch));Thread t3 = new Thread(test.new MapOper(latch));Thread t4 = new Thread(test.new MapOper(latch));t1.setName("Thread1");t2.setName("Thread2");t3.setName("Thread3");t4.setName("Thread4");t1.start();Thread.sleep(1500);t2.start();Thread.sleep(1500);t3.start();Thread.sleep(1500);t4.start();System.out.println("thread already start, sleep for a while...");Thread.sleep(1000);latch.countDown();}}

    队列变化:

    1、同步队列初始化:前3个工作线程调用await()方法,经过【11】,……,【5】线程挂起,状态如下:

    2、主线程调用countDown()方法,经过【12】,【13】,【9】唤醒线程t-1,t-1继续执行,经过【14】,【15】状态如下:

    3、t-1继续执行【16】,【17】唤醒下一个线程(node-2)

    4、t-2,t-3依次,都唤醒下一个。(每个节点都由前置节点对应的线程唤醒,唤醒立即返回)

    5、结束

     

     

     

    总结

    以上是生活随笔为你收集整理的AbstractQueuedSynchronizer源码解析的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。