简介
是AbstractQueuedSynchronizer的简称,是一个用来构建锁和同步器的框架,比如ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。
AQS源码分析
内部数据结构
它维护了一个volatile int state(代表共享资源)和一个FIFO线程等待双端队列(多线程争用资源被阻塞时会进入此队列),并使用了两个指针head和tail用于标识队列的头部和尾部。
|
CLH队列并不是直接存储线程,而是存储拥有线程的Node节点
Node的结构:
static final class Node { |
资源共享模式
资源有两种共享模式,或者说两种同步方式:
- 独占模式(Exclusive):资源是独占的,一次只能一个线程获取。如ReentrantLock。
- 共享模式(Share):同时可以被多个线程获取,具体的资源个数可以通过参数指定。如Semaphore/CountDownLatch。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占模式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占模式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享模式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享模式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
这些方法虽然都是protected
方法,但是它们并没有在AQS具体实现,而是直接抛出异常,因为AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现。
protected boolean tryAcquire(int arg) { |
这里没有定义成abstract的原因:
独占模式下只需要实现tryAcquire-tryRelease,而共享模式下只需要实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。
(一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如
ReadWriteLock
。)
核心部分的源码分析
获取资源(独占)
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
// arg是要获取的资源的个数,在独占模式下始终为1。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}函数流程如下:
1)tryAcquire()尝试直接去获取资源,如果成功则直接返回(这里体现了非公平锁,每个线程获取锁时会尝试直接抢占加塞一次,而CLH队列中可能还有别的线程在等待);
2)addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
3)acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false;
4)如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*/
// 尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。
// 需自定义同步器去实现 tryLock()
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
// 将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
private Node addWaiter(Node mode) {
// 生成该线程对应的Node节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 尝试快速方式直接放到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用CAS尝试,成功则返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果等待队列为空或者上述CAS失败,通过自旋CAS插入(enq方法)
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 将node加入队尾
private Node enq(final Node node) {
// CAS自旋,直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
if (compareAndSetHead(new Node()))
tail = head;
} else {
//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
/**
* 1. 结点进入队尾后,检查状态,找到安全休息点;
* 2. 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
* 3. 被唤醒后,看自己是否可以获取到资源。如果拿到,head指向当前结点,并返回从入队到获取到的整个过程中是否被中断过;如果没获取到,继续流程1。
*/
final boolean acquireQueued(final Node node, int arg) {
// 标记是否成功拿到资源
boolean failed = true;
try {
// 标记等待过程中是否被中断过
boolean interrupted = false;
// 自旋
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 若前驱结点p是head,说明node是第二个结点,可以尝试去获取资源
if (p == head && tryAcquire(arg)) {
// 拿到资源后,将head指向该结点。
// 所以head所指的结点,就是当前获取到资源的那个结点或null。
setHead(node);
// setHead中node.prev已置为null,此处再将head.next置为null,是为了方便GC回收以前的head结点。(意味着将之前拿完资源的结点出队)
p.next = null; // help GC
// 成功获取资源
failed = false;
//返回等待过程中是否被中断过
return interrupted;
}
// 如果自己可以休息了,就进入waiting状态,直到被unpark()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 如果前驱放弃了,就一直往前找,直到找到一个正常等待状态的node,排在它的后面。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果前驱正常,就把前驱的状态设置成SIGNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// 调用park()使线程进入waiting状态
LockSupport.park(this);
// 如果被唤醒,查看自己是不是被中断的。
return Thread.interrupted();
}注意:
- 队列的尾部插入新的Node节点:由于AQS中会存在多个线程同时争夺资源的情况,因此肯定会出现多个线程同时插入节点的操作,所以在这里是通过CAS自旋的方式保证了操作的线程安全性。(addWaiter()和enq())
- park()会让当前线程进入waiting状态。在此状态下,有两种情况可以唤醒该线程:1)被unpark();2)被interrupt()。
LockSupport类是Java 6 引入的,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:
- park(boolean isAbsolute, long time):阻塞当前线程
- unpark(Thread jthread):使给定的线程停止阻塞
所以结点进入等待队列后,是调用park使它进入阻塞状态的。只有头结点的线程是处于活跃状态的。
释放资源(独占)
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
// 释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒等待队列里的下一个线程
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* <p>This method is always invoked by the thread performing release.
*
*/
// 需要独占模式的自定义同步器去实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 置零当前线程所在的节点状态,允许失败
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 找到下一个需要唤醒的节点
Node s = node.next;
// 如果这个后继结点为空或者状态大于0,即这个结点已被取消
if (s == null || s.waitStatus > 0) {
s = null;
// 将等待队列中所有还有用的结点向前移动(从后向前找)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒
LockSupport.unpark(s.thread);
}获取资源(共享)
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
// 获取指定量的资源,获取成功则直接返回;获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*/
// 返回值:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
// 将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,成功拿到相应量的资源后才返回。
private void doAcquireShared(int arg) {
// 加入队列尾部
final Node node = addWaiter(Node.SHARED);
// 标记是否成功
boolean failed = true;
try {
// 标记等待过程中是否被中断过
boolean interrupted = false;
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己
if (p == head) {
// 尝试获取资源
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 如果等待过程中被打断过,此时将中断补上。
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// 如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}流程:
1)tryAcquireShared()尝试获取资源,成功则直接返回;
2)失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
释放资源(共享)
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
// 释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
public final boolean releaseShared(int arg) {
// 尝试释放资源
if (tryReleaseShared(arg)) {
// 唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}其他
获取资源的方法除了acquire外,还有以下三个:
- acquireInterruptibly:申请可中断的资源(独占模式)
- acquireShared:申请共享模式的资源
- acquireSharedInterruptibly:申请可中断的资源(共享模式)