概念
它(AbstractQueuedSynchronized队列同步器)是一个抽象类,是用来构建锁或者其他同步组件的基础,它使用了一个int的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。通过子类继承然后重写里面的一些抽象方法,就像tryAcquire这些都是需要子类重写不然直接会直接抛出异常,aqs这个类本身没有实现任何的同步接口,子类里面的方法如果要修改state的状态有三个方法(getState(),setState(int newState),compareAndSetState(int expect,int update))
同步器既支持独占获取同步状态也支持共享获取同步状态,比如像ReentrantLock,ReentrantReadWriteLock和CountDownLatch等。
锁是面向我们使用者的,而aqs是面向实现者的。
方法介绍
static {
try {
//这里静态成员变量优先加载,静态代码块才会被加载
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronize.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
在AQS这个类第一次被加载的时候就会通过反射的方式获取到这个类里面的state,head,tail,waitStatus,next这五个成员变量在内存当中的便宜量也就是地址
独占式获取同步状态的模版方法
@ReservedStackAccess
public void acquire(int arg){
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
独占式的获取同步状态,获取成功返回,获取失败进入同步队列等待,这个方法调用重写后的tryAcquire方法,线程池里面的Worker和ReentrantLock的非公平和公平锁都会重写这个方法
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//当前线程处于中断的状态会立刻抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//不是的话会去尝试获取同步状态
if (!tryAcquire(arg))
//获取不到的话创建节点入队
doAcquireInterruptibly(arg);
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
在acquireInterruptibly上加了一个超时的限制,如果当前线程在超时时间没有获取到同步状态将会返回false,获取到了返回true。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
独占式的释放同步状态,该方法会在释放同步状态以后,去调用unparkSuccessor这个方法去查找当前节点的后继节点,一直到找到一个未被取消的waitStatus<=0的节点,把他标记成Signal(-1),尝试唤醒他。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//不为空直接唤醒后续节点
if (s != null)
LockSupport.unpark(s.thread);
}
如上,在释放锁的过程中调用这个方法来找到当前节点的后驱节点并唤醒,这个时候后驱节点继续自旋,因为后驱节点的prev节点是头节点然后cas获取同步状态,然后把后驱节点设置成头节点,prev变成null,原头节点的next变为null和prev都变为null反方便gc回收,原头节点出队,所以头节点的出队是靠头节点的后驱节点里面的线程来完成的,而后驱节点的唤醒是靠他的前驱节点
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//死循环判断当前节点的前置节点是不是头节点,如果是并且cas成功就把当前节点设置成头节点,前置节点的next指向置为null
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
//判断是否需要将当前线程挂起等待,并且挂起等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
waitStatus的状态表示含义
INITAL 值为0,初始化状态,当前节点再sync队列中,等待获取锁
CANCELLED 值为1,由于在同步队列中等待的线程等待超时或者被中断,那么这个线程会被取消,节点进入这个状态以后就不会再有变化了
SIGNAL 值为-1,表示当前节点的后继节点需要运行,也就是unpark
CONDITION 值为-2, 表示当前节点在等待condition 也就是在condition队列中
PROPAGATE 值为-3 ,表示当前场景下后续的acquireShared可以运行
评论区