Lundrea Lundrea's技术博客, 专注于后端与基础架构!

Java ReentrantLock原理

2024-04-28
Lundrea

ReentrantLockJava JUC(java.util.concurrent) 包下的一个锁工具,它实现了 Lock 接口,与 synchronized 锁不同,ReentrantLock 除了用来做线程间互斥之外,还提供了很多高级的特性,例如公平锁 & 非公平锁以及可中断

本文将从 JDK17 源码角度介绍一下 ReentrantLock 的底层实现原理,这部分是 Java 面试的常考知识点。(目前看到的博客都是基于 JDK8 源码分析的,而鲜有基于 JDK17 源码进行分析的)

2024美团暑期实习后端开发一面:公平锁与非公平锁是如何实现的? 看完这篇文章你就明白了!

AQS

AQS 的全称为 AbstractQueueSynchronizer ,即抽象队列同步器,通俗来讲,AQS 的作用就是来定义线程如何获得锁、线程未获得锁如何等待以及线程如何释放锁。ReentrantLock 的底层实现是高度依赖 AQS 的,这一点从源码中就可以看得出来:

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ···
    }
    ···
}

ReentrantLock 内部有一个 Sync 类型的对象,Sync 这个类则是继承自 AbstractQueuedSynchronizer,也就是抽象队列同步器,这个锁的同步控制基础就是这个 Sync 提供的,可以注意到这里的 Sync 仍然是抽象类,其实它还有两个子类,分别是 FairSyncNonFairSync ,分别对应于公平锁的实现和非公平锁的实现。

AQS 底层是一个 CLH 队列,是一个双向队列,由 Node 节点连接而成,这部分代码位于 AbstractQueuedSynchronizer 抽象类中,

/** CLH Nodes */
abstract static class Node {
    volatile Node prev;       // initially attached via casTail
    volatile Node next;       // visibly nonnull when signallable
    Thread waiter;            // visibly nonnull when enqueued
    volatile int status;      // written by owner, atomic bit ops by others
    
    ···
}

可以看到,一个 Node 包含指向前后 Node 的指针(prev 和 next),一个在等待的线程对象(waiter)以及状态(status)。这里的状态指的是这个线程对象的状态,在 JDK17 中,这个状态只有三种,但在 JDK8 中这里的状态数量更多。

// Node status bits, also used as argument and return values
static final int WAITING   = 1;          // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND      = 2;          // in a condition wait

其中 WAITING 表示线程在等待,CANCELLED 表示线程已取消获取锁(用于实现锁的可中断),COND 表示线程处于一个条件等待的状态。

除了 CLH 队列外,AQS 中还有一个状态字段,即:

/**
 * The synchronization state.
*/
private volatile int state;

用于标识锁目前是否被占用,等于 0 则锁空闲,大于 0 则锁被占用。

一个获取锁失败的线程会被封装成为一个 Node 对象放入到队列中排队等待获取锁。

知道了 AQS 的概念,现在我们可以通过 ReentrantLock 加锁的全流程来窥探到 ReentrantLock 的底层原理了。

ReentrantLock 加锁流程

首先是实例化一个 ReentrantLock 锁对象,这里构造函数可带参数,如果参数为 True ,则会实例化一个公平锁,默认为非公平锁,其实区别就在于是实例化了一个 FairSync 对象还是一个 NonFairSync 对象。

// 实例化一个锁
Lock lock = new ReentrantLock();

// 构造函数的实现
// 非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}
// 公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

之后调用 lock() 方法加锁,其实底层是调用了 sync 对象的 lock() 方法来加锁,看到这里,你可能更能理解为什么说 ReentrantLock 的底层实现是高度依赖 AQS 的。

public void lock() {
    sync.lock();
}

sync.lock() 的实现如下:

final void lock() {
    if (!initialTryLock())
        acquire(1);
}

这里 initialTryLock() 方法是一个抽象方法,实际上调用的是 Sync 这个类的两个子类 FairSyncNonFairSync 的重写的 initialTryLock() 方法。

我们先看非公平锁 FairSync 的实现:

/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        if (compareAndSetState(0, 1)) { // first attempt is unguarded
            setExclusiveOwnerThread(current);
            return true;
        } else if (getExclusiveOwnerThread() == current) {
            int c = getState() + 1;
            if (c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        } else
            return false;
    }

    /**
     * Acquire for non-reentrant cases after initialTryLock prescreen
     */
    protected final boolean tryAcquire(int acquires) {
        if (getState() == 0 && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}

非公平锁的 initialTryLock() 方法通过 CAS 尝试加锁(即 compareAndSetState(0, 1)),如果 AQSstate 字段为 0 ,则把这个字段变为 1 ,并使用 setExclusiveOwnerThread(current); 将独占锁的线程设置为当前线程即自己,否则加锁失败,失败之后会判断当前独占锁的线程是不是当前线程。如果是的话将 state 加一,同样加锁成功,这里就体现了可重入锁的实现,即同一个线程可以多次获取锁而不阻塞,state 字段其实就是这个线程获取锁的次数。如果这两种情况都没有加锁成功,则认为锁被另一个线程独占,加锁失败,返回 false

在第一次 CAS 加锁未成功时,会调用 acquire(1) 这个方法,这个方法是 AbstractQueuedSynchronizer 抽象类提供给我们的,接下来我们看看这个方法的实现。

public final void acquire(int arg) {
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}

首先调用 tryAcquire(arg) 方法,这里会去调用具体实现类的 tryAcquire() 方法,同样,我们先看非公平锁的实现,实际上上面已经给出,这里只摘出 tryAcquire() 的实现:

protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

这里是尝试了第二次 CAS 加锁,与 initialTryLock() 方法中的逻辑差不多,同样也是做 CAS 尝试,如果成功,将独占这个锁的线程设置为自己,如果不成功则返回 false

如果第二次 CAS 加锁不成功,则调用 acquire(null, arg, false, false, false, 0L) 方法,这个方法极其重要,它的内部定义了线程如何排队来获取锁的逻辑。

下面是这个方法的完整实现,我添加了较为详细的注释便于理解:

final int acquire(Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time) {
    // node 是将要加入 DLH 队列的节点
    // spins 是队头节点在竞争锁时可以自旋的次数
    // interrupted 当前线程是否中断获取锁
    // first 当前线程是否位于队列第一个节点中
    // pred 当前节点的前一个节点
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    /*
     * Repeatedly:
     *  Check if node now first
     *    if so, ensure head stable, else ensure valid predecessor
     *  if node is first or not yet enqueued, try acquiring
     *  else if node not yet created, create it
     *  else if not yet enqueued, try once to enqueue
     *  else if woken from park, retry (up to postSpins times)
     *  else if WAITING status not set, set and retry
     *  else park and clear WAITING status, and check cancellation
     */
    
    // 一个大循环,来控制线程对锁的竞争
    for (;;) {
        // 如果当前节点不为第一个节点、前一个节点也不为空、头结点也不等于前一个节点
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            // 如果前一个节点状态小于 0,说明前驱是一个取消了获取锁请求的线程
            // 则清理队列,去除掉取消获取锁的节点
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            // 如果前驱的前驱为空,
            } else if (pred.prev == null) {
                // 这是一个自旋等待提示,告诉CPU当前线程正在自旋等待,
                // 处理器可以进行一些针对自旋的优化
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        // 如果当前节点是第一个节点或者前驱为空,则竞争锁
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    // CAS 获取锁
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            // 如果成功获取到了锁
            if (acquired) {
                // 如果还是第一个,把这个节点从队列中拿下来,赋值给 head
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        // 如果当前节点为空
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                node = new SharedNode();
            else
                // 创建节点
                node = new ExclusiveNode();
        // 如果前驱为空,入队
        } else if (pred == null) {          // try to enqueue
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;
        // 如果是第一个节点且自旋次数不为 0 自旋次数减,且调用 onSpinWait()
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        // 如果当前节点状态为 0 ,说明是新节点,初始化状态为 WAITTING
        } else if (node.status == 0) {
            node.status = WAITING;          // enable signal and recheck
        // 重新计算自旋次数,并且由于当前线程自旋次数用完,被 park 住
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

这段核心代码的作用在于控制未获得锁的线程入队,同时让队列头部的线程参与争夺锁。需要注意的是,这里的 head 指向抢到锁的线程对应的 node

首先第一个 if 中,

  1. 如果发现有取消获取锁的线程,会执行队列清理,
  2. 如果自己是第二个节点,那么会一直自旋来判断前一个线程是否放弃了获取锁,防止队头线程已经放弃了获取锁,而在队列之后的线程还不知道,造成饥饿。

第二个 if 中,

  1. 如果当前节点是队列中的第一个节点或者前驱为空,则参与争夺锁,调用了 tryAcquire() 方法,也就是说争夺锁的线程只有队列中的第一个 Node 中的线程、刚释放了锁的那个线程以及还尚未进入这个函数的线程(在进入这个函数之前会尝试两次 CAS 来获取锁)。

第三个 if 中,

  1. 如果当前节点还为空,则实例化一个 node
  2. 如果前驱为空,则入队,
  3. 如果当前节点为队列中第一个节点,且自旋次数(spins)未用尽,则 -1 ,之后接着做下一次自旋
  4. 如果当前节点状态为 0 ,说明是新节点,将状态设置为 WAITTING
  5. 最后 else 中会先计算当前线程的自旋次数 spins,并将当前线程 park

按照源码中 spins 的计算方法,每当 spins 用尽,下一次的自旋次数为上一次的自旋次数的 2 倍 +1 。

可以看到,队列中第一个 node 中的线程等待的时间越长,其可以自旋的次数就会越多,这样可以有效地避免队列中的线程饥饿,因为自旋次数越多,获取锁的概率越大,所以随着等待时间的增加,获取锁的概率其实也是增加的。JDK17中是这样的,而在 JDK8 中却有所不同,JDK8 中线程每次被唤醒只会自旋一次就迅速被 park 住,这样就比较容易造成饥饿。

概括来讲整个加锁过程,一个线程首先会自旋两次尝试获取锁,如果这两次都没有获取到锁,则进入到 acquire() 方法的大循环中,队列中的所有在等待的线程都会被 park 住, 直到第一个线程被唤醒,然后开始自旋,直到自旋次数用尽,被再次 park 住……

你可能会问,那么谁来唤醒队列中第一个等待的线程呢?当然是刚刚释放锁的那个线程啦~

解锁也是调用了 sync 对象的 release() 方法,下面是解锁部分的源码:

public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    boolean free = (c == 0);
    if (free)
        setExclusiveOwnerThread(null);
    setState(c);
    return free;
}

private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        s.getAndUnsetStatus(WAITING);
        LockSupport.unpark(s.waiter);
    }
}

tryRelease() 方法尝试解锁,如果成功解锁,则调用 signalNext() 方法,唤醒队列中的第一个线程,于是队列中的第一个线程就加入到了获取锁的行列中来。

这就是 ReentrantLock 加锁过程及其底层原理,你可能会疑惑为什么前面都在讲非公平锁,那公平锁呢?

公平锁

其实公平锁的实现与非公平锁及其类似,只有一处不同,具体可以看公平锁的源码:

/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    /**
     * Acquires only if reentrant or queue is empty.
     */
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (getExclusiveOwnerThread() == current) {
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }

    /**
     * Acquires only if thread is first waiter or empty
     */
    protected final boolean tryAcquire(int acquires) {
        if (getState() == 0 && !hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}

可以看到,与非公平锁不同的是,在公平锁在决定是否要去抢锁之前会有一个额外的判断,也就是会调用 hasQueuedPredecessors() 方法,这个方法的作用在于判断队列中是否有等待处理的线程,如果没有,当前线程才会去抢锁,如果队列中有线程则无法抢锁,这样就保证了,线程可以按照自己在队列中的顺序公平地获得锁。


Similar Posts

上一篇 拓扑排序

Comments