AQS

简介

是AbstractQueuedSynchronizer的简称,是一个用来构建锁和同步器的框架,比如ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。

AQS源码分析

内部数据结构

它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待双端队列(多线程争用资源被阻塞时会进入此队列),并使用了两个指针head和tail用于标识队列的头部和尾部。


/**
* The synchronization state.
*/
private volatile int state;

/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}

/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}

/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

CLH队列并不是直接存储线程,而是存储拥有线程的Node节点

AQS的数据结构

Node的结构:

static final class Node {

// 标记一个结点(对应的线程)在共享模式下等待
static final Node SHARED = new Node();

// 标记一个结点(对应的线程)在独占模式下等待
static final Node EXCLUSIVE = null;

// waitStatus的值,表示该结点(对应的线程)已被取消
static final int CANCELLED = 1;

// waitStatus的值,表示后继结点(对应的线程)需要被唤醒
static final int SIGNAL = -1;

// waitStatus的值,表示该结点(对应的线程)在等待某一条件
static final int CONDITION = -2;

/*waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点(共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。)*/
static final int PROPAGATE = -3;

// 0:新结点入队时的默认状态
// 等待状态,取值范围,-3,-2,-1,0,1
volatile int waitStatus;
volatile Node prev; // 前驱结点
volatile Node next; // 后继结点
volatile Thread thread; // 结点对应的线程
Node nextWaiter; // 等待队列里下一个等待条件的结点

// 判断共享模式的方法
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

资源共享模式

资源有两种共享模式,或者说两种同步方式:

  • 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如ReentrantLock。
  • 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如Semaphore/CountDownLatch。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占模式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占模式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享模式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享模式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

这些方法虽然都是protected方法,但是它们并没有在AQS具体实现,而是直接抛出异常,因为AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

这里没有定义成abstract的原因:

独占模式下只需要实现tryAcquire-tryRelease,而共享模式下只需要实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

(一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如ReadWriteLock。)

核心部分的源码分析

  1. 获取资源(独占)

    /**
    * Acquires in exclusive mode, ignoring interrupts. Implemented
    * by invoking at least once {@link #tryAcquire},
    * returning on success. Otherwise the thread is queued, possibly
    * repeatedly blocking and unblocking, invoking {@link
    * #tryAcquire} until success. This method can be used
    * to implement method {@link Lock#lock}.
    *
    * @param arg the acquire argument. This value is conveyed to
    * {@link #tryAcquire} but is otherwise uninterpreted and
    * can represent anything you like.
    */
    // arg是要获取的资源的个数,在独占模式下始终为1。
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }

    函数流程如下:

    1)tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);

    2)addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

    3)acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false;

    4)如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

    /**
    * Attempts to acquire in exclusive mode. This method should query
    * if the state of the object permits it to be acquired in the
    * exclusive mode, and if so to acquire it.
    *
    * <p>This method is always invoked by the thread performing
    * acquire. If this method reports failure, the acquire method
    * may queue the thread, if it is not already queued, until it is
    * signalled by a release from some other thread. This can be used
    * to implement method {@link Lock#tryLock()}.
    */
    // 尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。
    // 需自定义同步器去实现 tryLock()
    protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
    }

    /**
    * Creates and enqueues node for current thread and given mode.
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    */
    // 将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
    private Node addWaiter(Node mode) {
    // 生成该线程对应的Node节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 尝试快速方式直接放到队尾
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    // 使用CAS尝试,成功则返回
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    // 如果等待队列为空或者上述CAS失败,通过自旋CAS插入(enq方法)
    enq(node);
    return node;
    }

    /**
    * Inserts node into queue, initializing if necessary. See picture above.
    * @param node the node to insert
    * @return node's predecessor
    */
    // 将node加入队尾
    private Node enq(final Node node) {
    // CAS自旋,直到成功加入队尾
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
    if (compareAndSetHead(new Node()))
    tail = head;
    } else {
    //正常流程,放入队尾
    node.prev = t;
    if (compareAndSetTail(t, node)) {
    t.next = node;
    return t;
    }
    }
    }
    }

    /**
    * Acquires in exclusive uninterruptible mode for thread already in
    * queue. Used by condition wait methods as well as acquire.
    *
    * @param node the node
    * @param arg the acquire argument
    * @return {@code true} if interrupted while waiting
    */
    /**
    * 1. 结点进入队尾后,检查状态,找到安全休息点;
    * 2. 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
    * 3. 被唤醒后,看自己是否可以获取到资源。如果拿到,head指向当前结点,并返回从入队到获取到的整个过程中是否被中断过;如果没获取到,继续流程1。
    */
    final boolean acquireQueued(final Node node, int arg) {
    // 标记是否成功拿到资源
    boolean failed = true;
    try {
    // 标记等待过程中是否被中断过
    boolean interrupted = false;
    // 自旋
    for (;;) {
    // 获取前驱节点
    final Node p = node.predecessor();
    // 若前驱结点p是head,说明node是第二个结点,可以尝试去获取资源
    if (p == head && tryAcquire(arg)) {
    // 拿到资源后,将head指向该结点。
    // 所以head所指的结点,就是当前获取到资源的那个结点或null。
    setHead(node);
    // setHead中node.prev已置为null,此处再将head.next置为null,是为了方便GC回收以前的head结点。(意味着将之前拿完资源的结点出队)
    p.next = null; // help GC
    // 成功获取资源
    failed = false;
    //返回等待过程中是否被中断过
    return interrupted;
    }
    // 如果自己可以休息了,就进入waiting状态,直到被unpark()
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    // 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
    if (failed)
    cancelAcquire(node);
    }
    }

    /**
    * Checks and updates status for a node that failed to acquire.
    * Returns true if thread should block. This is the main signal
    * control in all acquire loops. Requires that pred == node.prev.
    *
    * @param pred node's predecessor holding status
    * @param node the node
    * @return {@code true} if thread should block
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取前驱节点状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
    /*
    * This node has already set status asking a release
    * to signal it, so it can safely park.
    */
    return true;
    if (ws > 0) {
    /*
    * Predecessor was cancelled. Skip over predecessors and
    * indicate retry.
    */
    // 如果前驱放弃了,就一直往前找,直到找到一个正常等待状态的node,排在它的后面。
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    /*
    * waitStatus must be 0 or PROPAGATE. Indicate that we
    * need a signal, but don't park yet. Caller will need to
    * retry to make sure it cannot acquire before parking.
    */
    // 如果前驱正常,就把前驱的状态设置成SIGNAL。
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

    /**
    * Convenience method to park and then check if interrupted
    *
    * @return {@code true} if interrupted
    */
    private final boolean parkAndCheckInterrupt() {
    // 调用park()使线程进入waiting状态
    LockSupport.park(this);
    // 如果被唤醒,查看自己是不是被中断的。
    return Thread.interrupted();
    }

    注意:

    • 队列的尾部插入新的Node节点:由于AQS中会存在多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,所以在这里是通过CAS自旋的方式保证了操作的线程安全性。(addWaiter()和enq())
    • park()会让当前线程进入waiting状态。在此状态下,有两种情况可以唤醒该线程:1)被unpark();2)被interrupt()。

    LockSupport类是Java 6 引入的,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

    • park(boolean isAbsolute, long time):阻塞当前线程
    • unpark(Thread jthread):使给定的线程停止阻塞

    所以结点进入等待队列后,是调用park使它进入阻塞状态的。只有头结点的线程是处于活跃状态的

  2. 释放资源(独占)


    /**
    * Releases in exclusive mode. Implemented by unblocking one or
    * more threads if {@link #tryRelease} returns true.
    * This method can be used to implement method {@link Lock#unlock}.
    *
    * @param arg the release argument. This value is conveyed to
    * {@link #tryRelease} but is otherwise uninterpreted and
    * can represent anything you like.
    * @return the value returned from {@link #tryRelease}
    */
    // 释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
    public final boolean release(int arg) {
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    // 唤醒等待队列里的下一个线程
    unparkSuccessor(h);
    return true;
    }
    return false;
    }

    /**
    * Attempts to set the state to reflect a release in exclusive
    * mode.
    *
    * <p>This method is always invoked by the thread performing release.
    *
    */
    // 需要独占模式的自定义同步器去实现
    protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
    }


    /**
    * Wakes up node's successor, if one exists.
    *
    * @param node the node
    */
    private void unparkSuccessor(Node node) {
    /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
    int ws = node.waitStatus;
    // 置零当前线程所在的节点状态,允许失败
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    // 找到下一个需要唤醒的节点
    Node s = node.next;
    // 如果这个后继结点为空或者状态大于0,即这个结点已被取消
    if (s == null || s.waitStatus > 0) {
    s = null;
    // 将等待队列中所有还有用的结点向前移动(从后向前找)
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    // 唤醒
    LockSupport.unpark(s.thread);
    }
  3. 获取资源(共享)

    /**
    * Acquires in shared mode, ignoring interrupts. Implemented by
    * first invoking at least once {@link #tryAcquireShared},
    * returning on success. Otherwise the thread is queued, possibly
    * repeatedly blocking and unblocking, invoking {@link
    * #tryAcquireShared} until success.
    *
    * @param arg the acquire argument. This value is conveyed to
    * {@link #tryAcquireShared} but is otherwise uninterpreted
    * and can represent anything you like.
    */
    // 获取指定量的资源,获取成功则直接返回;获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。
    public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
    }

    /**
    * Attempts to acquire in shared mode. This method should query if
    * the state of the object permits it to be acquired in the shared
    * mode, and if so to acquire it.
    *
    * <p>This method is always invoked by the thread performing
    * acquire. If this method reports failure, the acquire method
    * may queue the thread, if it is not already queued, until it is
    * signalled by a release from some other thread.
    *
    * <p>The default implementation throws {@link
    * UnsupportedOperationException}.
    */
    // 返回值:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
    protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
    }

    /**
    * Acquires in shared uninterruptible mode.
    * @param arg the acquire argument
    */
    // 将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,成功拿到相应量的资源后才返回。
    private void doAcquireShared(int arg) {
    // 加入队列尾部
    final Node node = addWaiter(Node.SHARED);
    // 标记是否成功
    boolean failed = true;
    try {
    // 标记等待过程中是否被中断过
    boolean interrupted = false;
    for (;;) {
    // 获取前驱节点
    final Node p = node.predecessor();
    // 如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己
    if (p == head) {
    // 尝试获取资源
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    // 如果等待过程中被打断过,此时将中断补上。
    if (interrupted)
    selfInterrupt();
    failed = false;
    return;
    }
    }
    // 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }


    /**
    * Sets head of queue, and checks if successor may be waiting
    * in shared mode, if so propagating if either propagate > 0 or
    * PROPAGATE status was set.
    *
    * @param node the node
    * @param propagate the return value from a tryAcquireShared
    */
    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
    * Try to signal next queued node if:
    * Propagation was indicated by caller,
    * or was recorded (as h.waitStatus either before
    * or after setHead) by a previous operation
    * (note: this uses sign-check of waitStatus because
    * PROPAGATE status may transition to SIGNAL.)
    * and
    * The next node is waiting in shared mode,
    * or we don't know, because it appears null
    *
    * The conservatism in both of these checks may cause
    * unnecessary wake-ups, but only when there are multiple
    * racing acquires/releases, so most need signals now or soon
    * anyway.
    */
    // 如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
    (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
    doReleaseShared();
    }
    }

    流程:

    1)tryAcquireShared()尝试获取资源,成功则直接返回;

    2)失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

  4. 释放资源(共享)

    /**
    * Releases in shared mode. Implemented by unblocking one or more
    * threads if {@link #tryReleaseShared} returns true.
    *
    * @param arg the release argument. This value is conveyed to
    * {@link #tryReleaseShared} but is otherwise uninterpreted
    * and can represent anything you like.
    * @return the value returned from {@link #tryReleaseShared}
    */
    // 释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
    public final boolean releaseShared(int arg) {
    // 尝试释放资源
    if (tryReleaseShared(arg)) {
    // 唤醒后继节点
    doReleaseShared();
    return true;
    }
    return false;
    }


    /**
    * Release action for shared mode -- signals successor and ensures
    * propagation. (Note: For exclusive mode, release just amounts
    * to calling unparkSuccessor of head if it needs signal.)
    */
    private void doReleaseShared() {
    /*
    * Ensure that a release propagates, even if there are other
    * in-progress acquires/releases. This proceeds in the usual
    * way of trying to unparkSuccessor of head if it needs
    * signal. But if it does not, status is set to PROPAGATE to
    * ensure that upon release, propagation continues.
    * Additionally, we must loop in case a new node is added
    * while we are doing this. Also, unlike other uses of
    * unparkSuccessor, we need to know if CAS to reset status
    * fails, if so rechecking.
    */
    for (;;) {
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue; // loop to recheck cases
    // 唤醒后继节点
    unparkSuccessor(h);
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue; // loop on failed CAS
    }
    if (h == head) // loop if head changed
    break;
    }
    }

  5. 其他

    获取资源的方法除了acquire外,还有以下三个:

    • acquireInterruptibly:申请可中断的资源(独占模式)
    • acquireShared:申请共享模式的资源
    • acquireSharedInterruptibly:申请可中断的资源(共享模式)
Author: Jiayi Yang
Link: https://jiayiy.github.io/2020/07/16/AQS/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.