热血江湖私服:ReentrantLock 源码分析以及 AQS (一)

前言

JDK1.5 之后发布了JUC(java.util.concurrent),用于解决多线程并发问题。AQS 是一个特别重要的同步框架,很多同步类都借助于 AQS 实现了对线程同步状态的管理。

AQS 中最主要的就是独占锁和共享锁的获取和释放,以及提供了一些可中断的获取锁,超时等待锁等方法。

ReentranLock 是基于 AQS 独占锁的一个实现。ReentrantReadWriteLock 是基于 AQS 共享锁的一个读写锁实现。本来打算一篇文章里面写完独占锁和共享锁,但是发现篇幅太长了,也不易于消化。

因此,本篇就先结合 ReentrantLock 源码,分析 AQS 的独占锁获取和释放。以及 ReentrantLock 的公平锁和非公平锁实现。

下一篇再写 ReentrantReadWriteLock 读写锁源码,以及 AQS 共享锁的获取和释放。

在正式讲解源码之前,墙裂建议读者做一些准备工作,最好对以下知识有一定的了解,这样阅读起来源码会比较轻松(因为,我当初刚开始接触多线程时,直接看 AQS 简直是一脸懵逼,就像读天书一样。。)。

  1. 了解双向链表的数据结构,以及队列的入队出队等操作。
  2. LockSupport 的 park,unpark 方法,以及对线程的 interrupt 几个方法了解(可参考:LockSupport的 park 方法是怎么响应中断的?)。
  3. 对 CAS 和自旋机制有一定的了解。

AQS 同步队列

AQS 内部维护了一个 FIFO(先进先出)的双向队列。它的内部是用双向链表来实现的,每个数据节点(Node)中都包含了当前节点的线程信息,还有它的前后两个指针,分别指向前驱节点和后继节点。下边看一下 Node 的属性和方法:

static final class Node {
    //可以认为是一种标记,表明了这个 node 是以共享模式在同步队列中等待
    static final Node SHARED = new Node();
    //也是一种标记,表明这个 node 是以独占模式在同步队列中等待
    static final Node EXCLUSIVE = null;

    /** waitStatus 常量值 */
    //说明当前节点被取消,原因有可能是超时,或者被中断。
    //节点被取消的状态是不可逆的,也就是说此节点会一直停留在取消状态,不会转变。
    static final int CANCELLED =  1;
    //说明后继节点的线程被 park 阻塞,因此当前线程需要在释放锁或者被取消时,唤醒后继节点
    static final int SIGNAL    = -1;
    //说明线程在 condition 条件队列等待
    static final int CONDITION = -2;
    //在共享模式中用,表明下一个共享线程应该无条件传播
    static final int PROPAGATE = -3;

    
    //当前线程的等待状态,除了以上四种值,还有一个值 0 为初始化状态(条件队列的节点除外)。
    //注意这个值修改时是通过 CAS ,以保证线程安全。
    volatile int waitStatus;

    //前驱节点
    volatile Node prev;

    //后继节点
    volatile Node next;

    //当前节点中的线程,通过构造函数初始化,出队时会置空(这个后续说,重点强调)
    volatile Thread thread;

    //有两种情况。1.在 condition 条件队列中的后一个节点 
    //2. 一个特殊值 SHARED 用于表明当前是共享模式(因为条件队列只存在于独占模式)
    Node nextWaiter;

    //是否是共享模式,理由同上
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //返回前驱节点,如果为空抛出空指针
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

另外,在 AQS 类中,还会记录同步队列的头结点和尾结点:

    //同步队列的头结点,是懒加载的,即不会立即创建一个同步队列,
    //只有当某个线程获取不到锁,需要排队的时候,才会初始化头结点
    private transient volatile Node head;

    //同步队列的尾结点,同样是懒加载。
    private transient volatile Node tail;

独占锁

这部分就结合 ReentrantLock 源码分析 AQS 的独占锁是怎样获得和释放锁的。

非公平锁

首先,我们从 ReentrantLock 开始分析,它有两个构造方法,一个构造,可以传入一个 boolean 类型的参数,表明是用公平锁还是非公平锁模式。另一个构造方法,不传入任何参数,则默认用非公平锁。

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

NonfairSync 和 FairSync 都继承自 Sync ,它们都是 ReentranLock 的内部类。 而Sync 类又继承自 AQS (AbstractQueuedSynchronizer)。

static final class NonfairSync extends Sync {
}

static final class FairSync extends Sync {
}

abstract static class Sync extends AbstractQueuedSynchronizer {
}

知道了它们之间的继承关系,我们就从非公平锁的加锁方法作为入口,跟踪源码。因为非公平锁的流程讲明白之后,公平锁大致流程都一样,只是多了一个条件判断(这个,一会儿后边细讲,会做对比)。

NonfairSync.lock

我们看下公平锁的获取锁的方法:

final void lock() {
    //通过 CAS 操作把 state 设置为 1
    if (compareAndSetState(0, 1))
        //如果设值成功,说明加锁成功,保存当前获得锁的线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //如果加锁失败,则执行 AQS 的acquire 方法
        acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire

这个方法的逻辑是:

  1. 通过 tryAcquire 方法,尝试获取锁,如果成功,则返回 true,失败返回 false 。
  2. tryAcquire 失败之后,会先调用 addWaiter 方法,把当前线程封装成 node 节点,加入同步队列(独占模式)。
  3. acquireQueued 方法会把刚加入队列的 node 作为参数,通过自旋去获得锁。

tryAcquire

这是一个模板方法,具体的实现需要看它的子类,这里对应的就是 ReentrantLock.NonfairSync.tryAcquire 方法。我们看一下:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    //当前线程
    final Thread current = Thread.currentThread();
    //获取当前的同步状态,若为 0 ,表示无锁状态。若大于 0,表示已经有线程抢到了锁。
    int c = getState();
    if (c == 0) {
        //然后通过 CAS 操作把 state 的值改为 1。
        if (compareAndSetState(0, acquires)) {
            // CAS 成功之后,保存当前获得锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // 如果 state 大于0,则判断当前线程是否是获得锁的线程,是的话,可重入。
    else if (current == getExclusiveOwnerThread()) {
        //由于 ReentrantLock 是可重入的,所以每重入一次 state 就加 1 。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

addWaiter

如果获取锁失败之后,就会调用 addWaiter 方法,把当前线程加入同步队列。

private Node addWaiter(Node mode) {
    //把当前线程封装成 Node ,并且是独占模式
    Node node = new Node(Thread.currentThread(), mode);
    //尝试快速入队,如果失败,则会调用 enq 入队方法。enq 会初始化队列。
    Node pred = tail;
    //如果 tail 不为空,说明当前队列中已经有节点
    if (pred != null) { 
        //把当前 node 的 prev 指针指向 tail
        node.prev = pred;
        //通过 CAS 把 node 设置为 tail,即添加到队尾
        if (compareAndSetTail(pred, node)) {
            //把旧的 tail 节点的 next 指针指向当前 node
            pred.next = node;
            return node;
        }
    }
    //当 tail 为空时,把 node 添加到队列,如果需要的话,先进行队列初始化
    enq(node);
    //入队成功之后,返回当前 node
    return node;
}

enq

通过自旋,把当前节点加入到队列中

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //如果 tail为空,说明队列未初始化
        if (t == null) { 
            //创建一个空节点,通过 CAS把它设置为头结点
            if (compareAndSetHead(new Node()))
                //此时只有一个 head头节点,因此把 tail也指向它
                tail = head;
        } else {
            //第二次自旋时,tail不为空,于是把当前节点的 prev指向 tail节点
            node.prev = t;
            //通过 CAS把 tail节点设置为当前 node节点
            if (compareAndSetTail(t, node)) {
                //把旧的 tail节点的 next指向当前 node
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued

入队成功之后,就会调用 acquireQueued 方法自旋抢锁。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取当前节点的前驱节点
            final Node p = node.predecessor();
            //如果前驱节点就是 head 节点,就调用 tryAcquire 方法抢锁
            if (p == head && tryAcquire(arg)) {
                //如果抢锁成功,就把当前 node 设置为头结点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                //抢锁成功后,会把线程中断标志返回出去,终止for循环
                return interrupted;
            }
            //如果抢锁失败,就根据前驱节点的 waitStatus 状态判断是否需要把当前线程挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                //线程被挂起时,判断是否被中断过
                parkAndCheckInterrupt())
                //注意此处,如果被线程被中断过,需要把中断标志重新设置一下
                interrupted = true;
        }
    } finally {
        if (failed)
            //如果抛出异常,则取消锁的获取,进行出队操作
            cancelAcquire(node);
    }
}

setHead

通过代码,我们可以看到,当前的同步队列中,只有第二个节点才有资格抢锁。如果抢锁成功,则会把它设置为头结点。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

需要注意的是,这个方法,会把头结点的线程设置为 null 。想一下,为什么?

因为,此时头结点的线程已经抢锁成功,需要出队了。自然的,队列中也就不应该存在这个线程了。

PS:由 enq 方法,还有 setHead 方法,我们可以发现,头结点的线程总是为 null。这是因为,头结点要么是刚初始化的空节点,要么是抢到锁的线程出队了。因此,我们也常常把头结点叫做虚拟节点(不存储任何线程)。

shouldParkAfterFailedAcquire

以上是抢锁成功的情况,那么抢锁失败了呢?这时,我们需要判断是否应该把当前线程挂起。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取当前节点的前驱节点的 waitStatus
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        //如果 ws = -1 ,说明当前线程可以被前驱节点正常唤醒,于是就可以安全的 park了
        return true;
    if (ws > 0) {
        //如果 ws > 0,说明前驱节点被取消,则会从当前节点依次向前查找,
        //直到找到第一个没有被取消的节点,把那个节点的 next 指向当前 node
        //这一步,是为了找到一个可以把当前线程唤起的前驱节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //如果 ws 为 0,或者 -3(共享锁状态),则把它设置为 -1 
        //返回 false,下次自旋时,就会判断等于 -1,返回 true了
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}   

parkAndCheckInterrupt

如果 shouldParkAfterFailedAcquire 返回 true,说明当前线程需要被挂起。因此,就执行此方法,同时检查线程是否被中断。

private final boolean parkAndCheckInterrupt() {
    //把当前线程挂起,则 acquireQueued 方法的自旋就会暂停,等待前驱节点 unpark
    LockSupport.park(this);
    //返回当前节点是否被中断的标志,注意此方法会把线程的中断标志清除。
    //因此,返回上一层方法时,需要设置 interrupted = true 把中断标志重新设置,以便上层代码可以处理中断
    return Thread.interrupted();
}

想一下,为什么抢锁失败后,需要判断是否把线程挂起?

因为,如果抢不到锁,并且还不把线程挂起,acquireQueued 方法就会一直自旋下去,这样你的CPU能受得了吗。

cancelAcquire

当不停的自旋抢锁时,若发生了异常,就会调用此方法,取消正在尝试获取锁的线程。node 的位置分为三种情况,见下面注释,

private void cancelAcquire(Node node) {

    if (node == null)
        return;

    // node 不再指向任何线程
    node.thread = null;

    Node pred = node.prev;
    //从当前节点不断的向前查找,直到找到一个有效的前驱节点
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    //把 node 的 ws 设置为 -1 
    node.waitStatus = Node.CANCELLED;

    // 1.如果 node 是 tail,则把 tail 更新为 node,并把 pred.next 指向 null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        //2.如果 node 既不是 tail,也不是 head 的后继节点,就把 node的前驱节点的 ws 设置为 -1
        //最后把 node 的前驱节点的 next 指向 node 的后继节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            //3.如果 node是 head 的后继节点,则直接唤醒 node 的后继节点。
            //这个也很好理解,因为 node 是队列中唯一有资格尝试获取锁的节点,
            //它放弃了资格,当然有义务把后继节点唤醒,以让后继节点尝试抢锁。
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

unparkSuccessor

这个唤醒方法就比较简单了,

private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从尾结点向前依次遍历,直到找到距离当前 node 最近的一个有效节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //把这个有效节点的线程唤醒,
        //唤醒之后,当前线程就可以继续自旋抢锁了,(回到 park 的地方)
        LockSupport.unpark(s.thread);
}

下面画一个流程图更直观的查看整个获取锁的过程。

公平锁

公平锁和非公平锁的整体流程大致相同,只是在抢锁之前先判断一下是否已经有人排在前面,如果有的话,就不执行抢锁。我们通过源码追踪到 FairSync.tryAcquire 方法。会发现,多了一个 hasQueuedPredecessors 方法。

hasQueuedPredecessors

这个方法判断逻辑稍微有点复杂,有多种情况。

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}
  1. 如果 h == t,说明 h 和 t 都为空(此时队列还未初始化)或者它们是同一个节点(说明队列已经初始化,并且只有一个节点,此时为 enq 方法第一次自旋成功后)。此时,返回false。
  2. 如果 h != t,则判断 head.next == null 是否成立,如果成立,则返回 true。这种情况发生在有其他线程第一次入队时。在 AQS 的 enq 入队方法,设置头结点成功之后 compareAndSetHead(new Node()) ,还未执行 tail = head 时(仔细想一想为什么?)。此时 tail = null , head = new Node(),head.next = null。
  3. 如果 h != t,并且 head.next != null,说明此时队列中至少已经有两个节点,则判断 head.next 是否是当前线程。如果是,返回 false(注意是 false哦,因为用了 !),否则返回 true 。

总结:以上几种情况,只有最终返回 false 时,才会继续往下执行。因为 false,说明没有线程排在当前线程前面,于是通过 CAS 尝试把 state 值设置为 1。若成功,则方法返回。若失败,同样需要去排队。

公平锁和非公平锁区别

举个例子来对比公平锁和非公平锁。比如,现在到饭点了,大家都到食堂打饭。把队列中的节点比作排队打饭的人,每个打饭窗口都有一个管理员,只有排队的人从管理员手中抢到锁,才有资格打饭。打饭的过程就是线程执行的过程。

如果,你发现前面没有人在排队,那么就可以直接从管理员手中拿到锁,然后打饭。对于公平锁来说,如果你前面有人在打饭,那么你就要排队到他后面(图中B),等他打完之后,把锁还给管理员。那么,你就可以从管理员手中拿到锁,然后打饭了。后面的人依次排队。这就是FIFO先进先出的队列模型。

对于非公平锁来说,如果你是图中的 B,当 A 把锁还给管理员后,有可能有另外一个 D 插队过来直接把锁抢走。那么,他就可以打饭,你只能继续等待了。

所以,可以看出来。公平锁是严格按照排队的顺序来的,先来后到嘛,你来的早,就可以早点获取锁。优点是,这样不会造成某个线程等待时间过长,因为大家都是中规中矩的在排队。而缺点呢,就是会频繁的唤起线程,增加 CPU的开销。

非公平锁的优点是吞吐量大,因为有可能正好锁可用,然后线程来了,直接抢到锁了,不用排队了,这样也减少了 CPU 唤醒排队线程的开销。 但是,缺点也很明显,你说我排队排了好长时间了,终于轮到我打饭了,凭什么其他人刚过来就插到我前面,比我还先打到饭,也太不公平了吧,后边一大堆排队的人更是怨声载道。这要是每个人来了都插到我前面去,我岂不是要饿死了。

独占锁的释放

我们从 ReentrantLock 的 unlock 方法看起:

public void unlock() {
    //调用 AQS 的 release 方法
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        //如果头结点不为空,并且 ws 不为 0,则唤起后继节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

这段逻辑比较简单,当线程释放锁之后,就会唤醒后继节点。 unparkSuccessor 已讲,不再赘述。然后看下 tryRelease 方法,公平锁和非公平锁走的是同一个方法。

protected final boolean tryRelease(int releases) {
    //每释放一次锁,state 值就会减 1,因为之前可能有锁的重入
    int c = getState() - releases;
    //如果当前线程不是抢到锁的线程,则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        //只有 state 的值减到 0 的时候,才会全部释放锁
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

因为,ReentrantLock 支持锁的重入,所以每次重入 state 值都会加 1,相应的每次释放锁, state 的值也会减 1 。所以,这也是为什么每个 lock 方法最后都要有一个 unlock 方法释放锁,它们的个数需要保证相同。

当 state 值为 0 的时候,说明锁完全释放。其他线程才可以有机会抢到锁。

结语

以上已经讲解了独占锁主要的获取方法 acquire ,另外还有一些其他相关方法,不再赘述,因为主要逻辑都是一样的,只有部分稍有不同,只要理解了 acquire ,这些都是相通的。如 acquireInterruptibly 方法,它可以在获取锁的时候响应中断。还有超时获取锁的方法 doAcquireNanos 可以设定获取锁的超时时间,超时之后就返回失败。

下篇预告:分析 ReentrantReadWriteLock 读写锁源码,以及 AQS 共享锁的获取和释放,敬请期待。

如果本文对你有用,欢迎点赞,评论,转发。

学习是枯燥的,也是有趣的。我是「烟雨星空」,欢迎关注,可第一时间接收文章推送。

未经允许不得转载: » 热血江湖私服:ReentrantLock 源码分析以及 AQS (一)

赞 (0) 打赏

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏