欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

java concurrent 框架,java.util.concurrent 包下的 Synchronizer 框架

发布时间:2025/4/5 编程问答 44 豆豆
生活随笔 收集整理的这篇文章主要介绍了 java concurrent 框架,java.util.concurrent 包下的 Synchronizer 框架 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

看完书 java concurrency in practice 当然是想找点啥好玩的东东玩玩。 当看到了Doug Lee 的论文 << The java.util.concurrent Synchronizer Framework >> 大呼来的太晚喔, 前段时间看那个ReentrantLock 的代码真的是痛苦啊,不过现在也不晚不是。  呵呵, 上菜:这个框架的核心是一个AbstractQueuedSynchronizer 类 (下面简称AQS)  它基本上的思路是:

采用Template Method Pattern.  它实现了non-contended 的synchronization 算法;

继承 它的Subclass  一般不直接作为Synchronzier, 而是作为私有的实现 被用来delegate.  比如 他举了个例子:

class Mutex implements Lock, java.io.Serializable {

Java代码

// Our internal helper class

private static class Sync extends AbstractQueuedSynchronizer {

.....

}

// The sync object does all the hard work. We just forward to it.

private final Sync sync = new Sync();

public void lock()                { sync.acquire(1); }

public boolean tryLock()          { return sync.tryAcquire(1); }

public void unlock()              { sync.release(1); }

public Condition newCondition()   { return sync.newCondition(); }

public boolean isLocked()         { return sync.isHeldExclusively(); }

public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }

public void lockInterruptibly() throws InterruptedException {

sync.acquireInterruptibly(1);

}

public boolean tryLock(long timeout, TimeUnit unit)

throws InterruptedException {

return sync.tryAcquireNanos(1, unit.toNanos(timeout));

}

在这个 AQS 类中 提供了 两大类方法

acquire   其中还包括  非阻塞的 tryAcquire;  带 time out 的;可以通过interruption 来cancellality 的。

release   相对简单点, 因为在调用release 方法的时候基本上暗含了个意思当前的线程是获得了锁的。

在JUC 包中没有为各种Synchronizer 类提供统一的API 比如 Lock.lock, Semaphore.acquire, CountDownLatch.await  and  FutureTask.get 都是映射到这个AQS 的 acquire 方法。

要实现一个Synchronizer 的基本思路非常直接的

acquire 操作

采用CAS 操作去更新同步状态  如果不成功  {

enqueue 当前线程

block 当前线程

}

dequeue  当前线程  if it was queued      // 此时应该是当前线程被别的线程 release 来唤醒的。

release 操作 :

更新同步状态, 此时可以直接 set 而不通过 CAS。

假如 等待队列中还有线程  unblock  one or more

要支持这些操作 需要 三类基本的组件:

1,   Atomically managing synchronization state。   这个通过

private   volatile  int   state;

在处理acquire 的时候基本是通过用CAS 实现的  compareAndSetState 来

2,   Block /Unblock 线程 。 这个是通过 JUC 里面的一个封装类 LockSupport.park / unpark 来实现的。LockSupport 都deleget 到了 Unsafte 对应的方法

3, 维护 一个队列来存放 等待线程。 这个是通过一个 CLH queue 的变种。 它是一种linked queue 的通过 head 和tail 。

三个里面最复杂的就是这个 CLH queue 的维护了, queue 中节点被定义为  :

static final class Node {

Java代码

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

final boolean isShared() {

return nextWaiter == SHARED;

}

final Node predecessor() throws NullPointerException {

Node p = prev;

if (p == null)

throw new NullPointerException();

else

return p;

}

Node() {    // Used to establish initial head or SHARED marker

}

Node(Thread thread, Node mode) {     // Used by addWaiter

this.nextWaiter = mode;

this.thread = thread;

}

Node(Thread thread, int waitStatus) { // Used by Condition

this.waitStatus = waitStatus;

this.thread = thread;

}

AQS 中定义了

Java代码

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

这个方法通常在 acquire 的时候如果 tryAcquire 失败就会将insert 一个 Node 到 tail 之后, 并且 node.prev 指向先前的 tail。 当然这个是在 compareAndSetTail(pred, node)  返回true 的时候比较简单, 否则就进入 enq(node) 方法 差不多也是这个逻辑insert 一个 Node 到 tail 之后。

在 1.6.0_25 版本里面AQS 就没有在 release 的时候 通过唤醒等待队列里面head指向的线程, 然后然后该线程是在 方法里面继续resume, 基本上会

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

return interrupted;

}。

就是将head的下一个节点设为Head, 原来的head 呢 p.next = null,  p 不再引用任何对象, gc 就会发生作用。  这个基本上会的例外是啥呢, 新线程acquire 调用 tryAcquire 抢占成功 。 这个线程就只有继续park 了。

基本上来讲 有了 这个 AQS  , 我们要想实现一个 Synchronizer 就比较简单了,  最简单的情况下  treAcquire , tryRelease 实现下。AQS 里面提供的需要继承的方法不是采用abstract 的方式 而是用抛出 notImplementedException的 方式, 如果像那些tryAcquireShared 之类的 在 exclusive 模式下根本就不会被调用到,  我们也就根本就不必override。  比如 先前我们所的那个 Mutex 我们只需要定义它的内部类 Sync

Java代码

// Our internal helper class

private static class Sync extends AbstractQueuedSynchronizer {

// Report whether in locked state

protected boolean isHeldExclusively() {

return getState() == 1;

}

// Acquire the lock if state is zero

public boolean tryAcquire(int acquires) {

assert acquires == 1; // Otherwise unused

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

// Release the lock by setting state to zero

protected boolean tryRelease(int releases) {

assert releases == 1; // Otherwise unused

if (getState() == 0) throw new IllegalMonitorStateException();

setExclusiveOwnerThread(null);

setState(0);

return true;

}

// Provide a Condition

Condition newCondition() { return new ConditionObject(); }

}

这个框架是非常精巧的,  还有很多地方比如那个 ConditionObject 里面涉及到另外一个单独的condition queue。只有再慢慢的啃了。

另外贴一张本人手工画的图,非常潦草多包涵了。

总结

以上是生活随笔为你收集整理的java concurrent 框架,java.util.concurrent 包下的 Synchronizer 框架的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。