最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

每日热点:AQS源码学习

来源:博客园

抽象队列同步器AQS

AQS介绍

AQS提供一套框架用于实现锁同步机制,其通过一个 FIFO队列 维护线程的同步状态,实现类只需要继承 AbstractQueuedSynchronizer,并重写指定方法(tryAcquire()/tryRelease()等)即可实现线程同步机制。

AQS 继承结构


(资料图片仅供参考)

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable{    //...}

AbstractOwnableSynchronizer

该类提供基础方法实现独占资源占有线程的关联

package java.util.concurrent.locks;public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {    private static final long serialVersionUID = 3737899427754241961L;    protected AbstractOwnableSynchronizer() { }        // 占有线程    private transient Thread exclusiveOwnerThread;        // 设置占有线程    protected final void setExclusiveOwnerThread(Thread thread) {        exclusiveOwnerThread = thread;    }        // 获取占有线程    protected final Thread getExclusiveOwnerThread() {        return exclusiveOwnerThread;    }}

AQS原理

AQS维护一个CLH (Craig, Landin, and Hagersten) 双向队列,记录头指针head(头指针没有与之对应的线程) 和 尾指针tail,同时维护了一个 volatile int state变量记录同步状态(初始状态默认为0,表示未被该资源未被占用)。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable{    private static final long serialVersionUID = 7373984972572414691L;        // 队列头节点    private transient volatile Node head;        // 队列尾节点    private transient volatile Node tail;        // 同步状态    private volatile int state;        // CAS 原子更新状态    protected final boolean compareAndSetState(int expect, int update) {        // See below for intrinsics setup to support this        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);    }}

申请锁 -> lock() 执行过程

ReentrantLock为例子,该锁默认实现是一个非公平独占锁

public static void main(String[] args) {    // 独占锁、默认非公平锁ReentrantLock reentrantLock = new ReentrantLock();    reentrantLock.lock();}// ReentrantLock.lock()public void lock() {    sync.lock();}// NofairSync.lock()final void lock() {    // CAS 获取锁    if (compareAndSetState(0, 1))        setExclusiveOwnerThread(Thread.currentThread());    else        // 获取锁失败        acquire(1);}public final void acquire(int arg) {    // tryAcquire: 尝试获取锁    // acquireQueued: 添加到阻塞队列    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

tryAcquire执行链: 尝试获取锁,获取不到则返回 false

// NofairSync.tryAcquire()protected final boolean tryAcquire(int acquires) {    return nonfairTryAcquire(acquires);}// Sync.nofairTryAcquire()final boolean nonfairTryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {        if (compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    return false;}

acquireQueued执行链:

  1. 首先通过 addWaiter 方法将线程添加到队列尾部
  2. 然后通过 acquireQueued 方法实现线程进入CLH队列后如何被阻塞或者被唤醒获取锁
// AbstractQueuedSynchronizer.addWaiter()// 添加 node 到等待队列尾部// 返回插入的节点 nodeprivate Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }        // tail == null    enq(node);    return node;}// 线程进入等待队列之后,如何获取锁或者继续阻塞final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            final Node p = node.predecessor();                        // 如果当前节点的前驱节点为 head,则竞争锁资源            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }                        // 当前节点的前驱节点不是 head, 或者竞争锁失败            // shouldParkAfterFailedAcquire: true, 调用 parkAndCheckInterrupt() 阻塞线程            // shouldParkAfterFailedAcquire: false, 再次进入循环块,竞争锁            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            // for循环意外退出才能走到这            cancelAcquire(node);    }}/** * 判断当前线程是否需要阻塞 * 阻塞(return true): *      1.前驱节点的状态 pred.waitStatus=SIGNAL * 不阻塞(return false): *      1.前驱节点的状态为 CANCELLED,循环向前找 ws <= 0 的前驱节点 *      2.前驱节点的状态 ws = 0 || ws = PROPAGATE */private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    if (ws == Node.SIGNAL)        /*         * This node has already set status asking a release         * to signal it, so it can safely park.         */        return true;    if (ws > 0) {        /*         * Predecessor was cancelled. Skip over predecessors and         * indicate retry.         */        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {        /*         * waitStatus must be 0 or PROPAGATE.  Indicate that we         * need a signal, but don"t park yet.  Caller will need to         * retry to make sure it cannot acquire before parking.         */        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}

释放锁 -> unlock() 执行过程

同样以 ReentrantLock为例子,该锁默认实现是一个非公平独占锁

public static void main(String[] args) {    // 独占锁、默认非公平锁ReentrantLock reentrantLock = new ReentrantLock();    reentrantLock.lock();        try {        // 业务代码    } catch (Exception e) {        e.printStackTrace();    } finally {        reentrantLock.unlock();    }}// ReentrantLock.unlock()public void unlock() {    sync.release(1);}

release执行链:

  1. 通过 tryRelease 方法判断当前锁是否已经被完全释放
  2. 如果已经被完全释放 -> 则唤醒其后继节点对应的线程
// AbstractQueuedSynchronizer.release()// tryRelease() 返回 true -> 则执行 if 中的逻辑 -> unparkSuccessor: 唤醒后继节点public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}// 释放锁,修改 state// free: true 锁已经完全释放,唤醒其他线程竞争// free: false 锁仍然被当前线程占有protected final boolean tryRelease(int releases) {    int c = getState() - releases;    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();    boolean free = false;    if (c == 0) {        free = true;        setExclusiveOwnerThread(null);    }    setState(c);    return free;}

unparkSuccessor: 唤醒后继节点

  • 情况1:直接唤醒当前节点的后继节点
  • 情况2: 情况1对应的节点状态为 CANCELLED,则从CLH队列尾部开始寻找 ws <= 0 的节点唤醒
/** * 唤醒后继节点 *  * waitStatus: *     CANCELLED(1)  : 当前节点因超时或响应中断结束调度,进入该状态的节点不再变化 *     SIGNAL(-1)    : 后继节点等待当前节点唤醒 *     CONDITION(-2) : 当前节点处于 condition 上,等待转移到CLH同步队列 *     PROPAGETE(-3) : 当前节点处于 shared 状态 *     0             : 默认状态 */private void unparkSuccessor(Node node) {    /*     * If status is negative (i.e., possibly needing signal) try     * to clear in anticipation of signalling.  It is OK if this     * fails or if status is changed by waiting thread.     */    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 情况1:直接唤醒当前节点的后继节点    // 情况2: 情况1对应的节点状态为 CANCELLED,则从CLH队列尾部开始寻找 ws <= 0 的节点唤醒    Node s = node.next;    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    if (s != null)        LockSupport.unpark(s.thread);}

FairSync && NofairSync

FairSync: 以 ReentrantLock 的公平锁实现为例

static final class FairSync extends Sync {    private static final long serialVersionUID = -3000897897090466540L;    final void lock() {        acquire(1);    }    protected final boolean tryAcquire(int acquires) {        final Thread current = Thread.currentThread();        int c = getState();        if (c == 0) {            // 判断 CLH 队列是否有正在等待的线程,如果有,则唤醒CLH 队列 head 的后继节点            if (!hasQueuedPredecessors() &&                compareAndSetState(0, acquires)) {                setExclusiveOwnerThread(current);                return true;            }        }        else if (current == getExclusiveOwnerThread()) {            int nextc = c + acquires;            if (nextc < 0)                throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }        return false;    }}

NofairSync: 以 ReentrantLock 的非公平锁实现为例

static final class NonfairSync extends Sync {    private static final long serialVersionUID = 7316153563782823691L;    /**     * Performs lock.  Try immediate barge, backing up to normal     * acquire on failure.     */    final void lock() {        // CAS 抢锁,如果恰巧没有线程占有,则直接获取锁返回        if (compareAndSetState(0, 1))            setExclusiveOwnerThread(Thread.currentThread());        else            // 抢锁失败,则进入 acquire            acquire(1);    }    protected final boolean tryAcquire(int acquires) {        return nonfairTryAcquire(acquires);    }}final boolean nonfairTryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    if (c == 0) {        // 同样进行 CAS 抢锁,而不是判断 CLH 队列中是否有等待线程        if (compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }        // 抢锁失败,则进入     return false;}

所以,公平锁和非公平锁的区别总结如下:

  1. 非公平锁调用 lock()方法,会马上进行一次 CAS 抢占锁
  2. 抢占锁失败后进入 tryAcquire()方法,公平锁会去判断CLH等待队列是否有线程处于等待状态,如果有则不抢占锁;非公平锁则会直接进行 CAS 尝试抢占锁

[^]: 注:以上源代码阅读与分析,基于 Oracle JDK8 版本

关键词: