当前位置:   article > 正文

【Java并发编程之美 | 第三篇】Java 并发包中锁原理剖析之AQS、ReentrantLock独占可重入锁

【Java并发编程之美 | 第三篇】Java 并发包中锁原理剖析之AQS、ReentrantLock独占可重入锁

在这里插入图片描述

3.Java 并发包中锁原理剖析

3.1AQS——抽象队列同步器

3.1.1AQS定义

  1. AQS,全称是 ,即抽象队列同步器,许多同步类的实现都依赖于它,如 ReentrantLock、Semaphore、CountDownLatch 等。
  2. AQS 的思想:如果被请求的共享资源空闲,则当前线程能够成功获取资源;否则,它将进入一个等待队列,当有其他线程释放资源时,系统会挑选等待队列中的一个线程,赋予其资源

3.1.2AQS数据结构

(1)组成部分
  1. AQS 主要由三部分组成
    1. state 同步状态
    2. Node 组成的 CLH 同步队列
    3. ConditionObject 条件变量(包含 Node 组成的条件单向队列)。
(2)state关键字
  1. 对于 AQS 来说,线程同步的关键是对 state 的操作,可以说获取、释放资源是否成功都是由 state 决定的

  2. 比如 state>0 代表可获取资源,否则无法获取,所以 state 的具体语义由实现者去定义,现有的 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 定义的 state 语义都不一样

    1. ReentrantLock 的 state 用来表示是否有锁资源,变量记录了锁的重入次数
    2. ReentrantReadWriteLock 的 state 高 16 位代表读锁状态,低 16 位代表写锁状态
    3. Semaphore 的 state 用来表示可用信号的个数
    4. CountDownLatch 的 state 用来表示计数器的值
  3. getState、setState、以及compareAndSwapInt三个方法均是原子操作,其中 compareAndSetState 的实现依赖于Unsafe类的compareAndSwapInt()方法

/**
 * The synchronization state.
 */
private volatile int state;

getState()
setState()
compareAndSetState()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
(3)同步队列
①CLH
  1. CLH 锁是有由 Craig, Landin, and Hagersten 这三个人发明的锁,取了三个人名字的首字母,所以叫 CLH Lock
  2. 同步队列是基于链表实现的双向队列,也是 CLH 锁的变种。CLH 锁是 AQS 队列同步器实现的基础
  3. 以下图为CLH的构成

img

②Node
  1. AQS 以内部类 Node 的形式定义了同步队列结点
static final class Node {

    /** 模式定义 */

    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /** 线程状态 */

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /** 线程等待状态 */
    volatile int waitStatus;

    /** 前驱结点 */
    volatile Node prev;
    /** 后置结点 */
    volatile Node next;

    /** 持有的线程对象 */
    volatile Thread thread;

    /** 对于独占模式而言,指向下一个处于 CONDITION 等待状态的结点;对于共享模式而言,则为 SHARED 结点 */
    Node nextWaiter;

    // ... 省略方法定义
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

waitStatus 有如下 5 中状态:

  • CANCELLED = 1 表示当前结点已取消调度。当超时或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
  • SIGNAL = -1 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL。
  • CONDITION = -2 表示结点等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION 状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE = -3 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
  • INITIAL = 0 新结点入队时的默认状态
③主要行为
  1. AQS 类成员变量 head 和 tail 字段分别指向同步队列的头结点和尾结点:
	 /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

img

同步队列的主要行为是 :入队、出队

  • 入队
    • 获取资源失败的线程需要封装成 Node 节点,接着尾部入队,在 AQS 中提供 addWaiter 函数完成 Node 节点的创建与入队
    • 添加节点的时候,如 CLH 队列已经存在,将新节点加到 tail 后面然后对 tail 进行 CAS 操作,将 tail 指针后移到新节点上。==
    • 如果添加失败或队列不存在,则初始化同步队列。
/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 出队
    • CLH 队列中的节点都是获取资源失败的线程节点,当持有资源的线程释放资源时,会将 head.next 指向的线程节点唤醒(CLH 队列的第二个节点)
    • 如果唤醒的线程节点获取资源成功,线程节点清空信息设置为头部节点(新哨兵节点),原头部节点出队(原哨兵节点)
 protected final boolean tryRelease(int releases) {
        int c = getState() - releases; 
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 如果 state=0 了,就是可以释放锁了
            free = true; 
            setExclusiveOwnerThread(null); // 将拿锁线程置为 null
        }
        setState(c); // 重置同步器的 state
        return free; // 返回是否成功释放
  }

  private void unparkSuccessor(Node node) {
    // node 节点是当前释放锁的节点,也是同步队列的头节点
    int ws = node.waitStatus;
    // 如果节点已经被取消了,把节点的状态置为初始化
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 拿出队二 s
    Node s = node.next;
    // s 为空,表示 node 的后一个节点为空
    // s.waitStatus 大于 0,代表 s 节点已经被取消了
    // 遇到以上这两种情况,就从队尾开始,向前遍历,找到第一个 waitStatus 字段不是被取消的
    if (s == null || s.waitStatus > 0) {
        s = null;
     
        // 结束条件是前置节点就是 head 了
        for (Node t = tail; t != null && t != node; t = t.prev)
            // t.waitStatus <= 0 说明 t 当前没有被取消,肯定还在等待被唤醒
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒以上代码找到的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

总结:出队列,锁释放唤醒 head 的后继节点,head 的后继节点从阻塞中醒来,开始抢锁,获取锁成功此时 head 指针向后移一个位置,原先 head 的后继节点成为新的 head

(4)条件队列
  1. 一个 AQS 可以对应多个条件变量,每一个条件变量对应这一个条件队列
  2. ConditionObject 内部维护着一个单向条件队列,不同于 CLH 队列,条件队列只入队执行 await 的线程节点,并且加入条件队列的节点,不能在 CLH 队列, 条件队列出队的节点,会入队到 CLH 队列
  3. 当某个线程执行了 ConditionObject 的 await 函数,阻塞当前线程,线程会被封装成 Node 节点添加到条件队列的末端
  4. 其他线程执行 ConditionObject 的 signal 函数,会将条件队列头部线程节点转移到 CLH 队列参与竞争资源,具体流程如下图:

img

img

  1. 一个 Condition 对象就有一个单项的等待任务队列。在一个多线程任务中我们可以 new 出多个等待任务队列。比如我们 new 出来两个等待队列。
 private Lock lock = new ReentrantLock();
 private Condition FirstCond = lock.newCondition();
 private Condition SecondCond = lock.newCondition();
  • 1
  • 2
  • 3

3.1.3AQS的同步方式

  1. AQS支持两种同步方式:
    1. 独占模式: 这种方式下,每次只能有一个线程持有锁,例如 ReentrantLock。
    2. 共享模式: 这种方式下,多个线程可以同时获取锁,例如 Semaphore 和 CountDownLatch。
  2. 子类可以通过继承 AQS 并实现它的方法来管理同步状态:
    1. tryAcquire:独占方式尝试获取资源,成功则返回 true,失败则返回 false;
    2. tryRelease:独占方式尝试释放资源;
    3. tryAcquireShared(int arg):共享方式尝试获取资源;
    4. tryReleaseShared(int arg):共享方式尝试释放资源;
    5. isHeldExclusively():该线程是否正在独占资源。
  3. 如果共享资源被占用,需要一种特定的阻塞等待唤醒机制来保证锁的分配,AQS 会将竞争共享资源失败的线程添加到一个 CLH 队列中

三分恶面渣逆袭:CLH队列

3.2ReentrantLock——独占可重入锁

3.2.1ReentrantLock实现原理

  1. ReentrantLock是可重入的独占锁,只能有一个线程可以获取该锁,其它获取该锁的线程会被阻塞。

  2. 可重入表示当前线程获取该锁后再次获取不会被阻塞,也就意味着同一个线程可以多次获得同一个锁而不会发生死锁。

  3. ReentrantLock 的加锁和解锁:

    new ReentrantLock() 默认创建的是非公平锁 NonfairSync。

    // 创建非公平锁
    ReentrantLock lock = new ReentrantLock();
    // 获取锁操作
    lock.lock();
    try {
        // 执行代码逻辑
    } catch (Exception ex) {
        // ...
    } finally {
        // 解锁操作
        lock.unlock();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

3.2.2公平锁/非公平锁

  1. 在公平锁模式下,锁会授予等待时间最长的线程
  2. 在非公平锁模式下,锁可能会授予刚刚请求它的线程,而不考虑等待时间

3.2.3实现可重入锁

  1. ReentrantLock 内部通过一个计数器来跟踪锁的持有次数
  2. 当线程调用 lock() 方法获取锁时,ReentrantLock 会检查当前状态,判断锁是否已经被其他线程持有。如果没有被持有,则当前线程将获得锁;如果锁已被其他线程持有,则当前线程将根据锁的公平性策略,可能会被加入到等待队列中
  3. 线程首次获取锁时,计数器值变为 1;如果同一线程再次获取锁,计数器增加;每释放一次锁,计数器减 1
  4. 当线程调用unlock()方法时,ReentrantLock 会将持有锁的计数减 1,如果计数到达 0,则释放锁,并唤醒等待队列中的线程来竞争锁

ReentrantLock 非公平锁加锁流程简图

3.2.4小结

  1. 首先,ReentrantLock 采用了独占模式,即同一时刻只允许一个线程持有锁。这保证了被锁保护的临界区只能被一个线程访问,从而避免了多个线程同时修改共享资源导致的数据竞争和不一致性。
  2. ReentrantLock的核心,是通过修改AQS中state的值来同步锁的状态。 通过这个方式,实现了可重入。
  3. ReentrantLock具备公平锁和非公平锁,默认使用非公平锁。其实现原理主要依赖于AQS中的同步队列。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/866356?site
推荐阅读
相关标签
  

闽ICP备14008679号