主页 > 手机  > 

详解AbstractQueuedSynchronizer(AQS)源码

详解AbstractQueuedSynchronizer(AQS)源码
引言

上篇文章讲解了CountDownLatch源码,底层是继承了AQS基类调用父类和重写父类方法实现的,本文将简介AQS源码和架构设计,帮助我们更深入理解多线程实战。

源码架构

1.  状态变量 state AQS 使用一个 int 类型的变量 state 来表示同步状态。这个变量是 volatile 修饰的,保证了多线程之间的可见性。不同的同步器对 state 有不同的定义和使用方式,例如在 ReentrantLock 中,state 表示锁的重入次数;在 CountDownLatch 中,state 表示计数器的值。

private volatile int state;

2. 双向队列 AQS 内部维护了一个双向链表队列,用于存储等待获取锁的线程。这个队列是 CLH(Craig, Landin, and Hagersten)锁队列的变种。每个节点(Node)代表一个等待线程,包含线程引用、等待状态等信息。

static final class Node { // 共享模式 static final Node SHARED = new Node(); // 独占模式 static final Node EXCLUSIVE = null; // 线程已取消 static final int CANCELLED = 1; // 后继节点需要被唤醒 static final int SIGNAL = -1; // 线程在等待条件 static final int CONDITION = -2; // 表示下一次共享式同步状态获取将会无条件地传播下去 static final int PROPAGATE = -3; // 等待状态 volatile int waitStatus; // 前驱节点 volatile Node prev; // 后继节点 volatile Node next; // 节点所包装的线程 volatile Thread thread; // 指向下一个处于 CONDITION 状态的节点 Node nextWaiter; ... }

而双向队列细分为条件队列和同步队列

同步队列

同步队列是 AQS 的核心队列,用于存放等待获取锁的线程节点。当线程尝试获取锁失败时,会被封装成节点加入到这个队列中等待。

在 AQS 类中有两个关键属性用于表示同步队列的头节点和尾节点:

private transient volatile Node head; private transient volatile Node tail;

head 指向队列的头节点,tail 指向队列的尾节点,这两个属性都是 volatile 修饰的,保证了多线程环境下的可见性。

节点加入同步队列(addWaiter 方法)

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 尝试快速将节点插入到队列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 快速插入失败,使用 enq 方法插入 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 队列还未初始化 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

条件队列

条件队列用于实现线程的等待 - 通知机制,一个 AQS 可以有多个条件队列,每个 Condition 对象对应一个条件队列。

关键类 ConditionObject

ConditionObject 是 AQS 中实现条件队列的内部类,它包含一个指向条件队列头节点和尾节点的引用

public class ConditionObject implements Condition, java.io.Serializable { private transient Node firstWaiter; private transient Node lastWaiter; ... }

线程进入条件队列(await 方法)

public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter() { Node t = lastWaiter; // 如果尾节点已取消,清除队列中所有已取消的节点 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }

唤醒条件队列中的线程

public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // 将节点的等待状态从 CONDITION 改为 0 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 将节点从条件队列转移到同步队列 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

标签:

详解AbstractQueuedSynchronizer(AQS)源码由讯客互联手机栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“详解AbstractQueuedSynchronizer(AQS)源码