Contents

ReentrantLock源码分析

本文主要对ReentrantLock的源码进行了简单的分析,具体包括ReentrantLock的初始化(公平锁和非公平锁),加锁过程和解锁过程等。

1. AbstractQueuedSynchronizer

ReentrantLock的实现依赖于AbstractQueuedSynchronizer所以需要了解一下AQS

1.1 简介

类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,是java.util.concurrent的核心,CountDownLatchFutureTaskSemaphoreReentrantLock等都有一个内部类是这个抽象类的子类。

AQS定义两种资源共享方式:

  • Exclusive: 独占,只有一个线程能执行,如ReentrantLock
  • Share: 共享,多个线程可同时执行,如Semaphore/CountDownLatch

1.2 AQS的4个属性

// 头结点,大概可以看做是当前持有锁的线程
private transient volatile Node head;
// 阻塞的尾节点,每个新的节点进来,都插入到最后
private transient volatile Node tail;
//当前锁的状态,0代表没有被占用,大于0代表有线程持有当前锁 
//是可重入锁 每次获取都活加1
private volatile int state;
// 代表当前持有独占锁的线程 锁重入时用这个来判断当前线程是否已经拥有了锁
//继承自AbstractOwnableSynchronizer
private transient Thread exclusiveOwnerThread; 

1.3 阻塞队列Node节点的属性

https://github.com/lixd/blog/raw/master/images/java/javase/aqs-wait-queue.png

Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已。

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的几个int常量是给waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 表示此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    //被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,
    //将会通知该后继结点的线程执行。
    //就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    //该标识的结点处于等待队列中,结点的线程等待在Condition上,等待其他线程唤醒
    //当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将
    //从等待队列转移到同步队列中,等待获取同步锁。
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
    static final int PROPAGATE = -3;
    // =====================================================
	// 节点的等待状态
    // 取值为上面的1、-1、-2、-3,或者0
    // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
    // 也许就是说半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的
    //AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。
    volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是线程对象
    volatile Thread thread;

}

2. ReentrantLock的使用

/**
 * Server层
 * 模拟ReentrantLock使用
 *
 * @author illusoryCloud
 */
public class UserServer {
    /**
     * 默认是非公平锁 传入参数true则创建的是公平锁
     */
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void updateUser() {
        //加锁 同一时刻只能有一个线程更新User
        reentrantLock.lock();
        try {

            //do something
        } finally {
            //释放锁放在finally代码块中 保证出现异常等情况也能释放锁
            reentrantLock.unlock();
        }
    }
}

3. ReentrantLock源码分析

1. 初始化

ReentrantLock reentrantLock = new ReentrantLock(true);

/**
 *默认是非公平锁
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

2. 加锁过程

reentrantLock.lock();

公平锁实现如下(JDK1.8):

    /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
		//争锁
        final void lock() {
            //1
            acquire(1);
        }  
        
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

1. acquire(1);

    /**
     * 尝试获取锁
     */
    public final void acquire(int arg) {
        //tryAcquire(1) 首先尝试获取一下锁
        //若成功则不需要进入等待队列了
        //1.1
        if (!tryAcquire(arg) &&
            //1.2
            // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //1.3
            selfInterrupt();
    }

1.1 tryAcquire(1)

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         * 尝试直接获取锁,返回值是boolean,代表是否获取到锁
         * 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
         */
        protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //查看锁的状态
            int c = getState();
            //state == 0 此时此刻没有线程持有锁 可以直接获取锁
            if (c == 0) {
                //由于是公平锁 则在获取锁之前先看一下队列中还有没有其他等待的线程
                //讲究先来后到 所以是公平锁  这也是和非公平锁的差别
                //非公平锁在这里会直接尝试获取锁
                //1.1.1
                if (!hasQueuedPredecessors() &&
                   // 如果没有线程在等待,那就用CAS尝试获取一下锁
                   // 不成功的话,只能说明几乎同一时刻有个线程抢先获取到了锁
                   //因为刚才hasQueuedPredecessors判断是前面没有线程在等待的
                    //1.1.2
                    compareAndSetState(0, acquires)) {
                    //获取到锁后把当前线程设置为锁的拥有者
                    //1.1.3
                    setExclusiveOwnerThread(current);
                    //获取锁成功直接返回true
                    return true;
                }
            }
            //到这里说明当前锁已经被占了
            //然后判断如果当前线程就是持有锁的线程
            //那么这次就是锁的重入
            else if (current == getExclusiveOwnerThread()) {
                //把state加1
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                //1.1.4
                setState(nextc);
                return true;
            }
            //上面两个条件都不满足就返回false
            //获取锁失败了 回到上一个方法继续看
            return false;
        }

1.1.1 hasQueuedPredecessors()

/**
  * 通过判断"当前线程"是不是在CLH队列的队首
  * 来返回AQS中是不是有比“当前线程”等待更久的线程
  */
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

1.1.2 compareAndSetState(0, acquires))

/**
 * 通过CAS设置锁的状态
 */     
protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

1.1.3 setExclusiveOwnerThread(current)

/**
 * 设置锁的拥有者
 */     
protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

1.1.4 setState(nextc)

/**
 * 设置锁的状态
 */    
protected final void setState(int newState) {
        state = newState;
    }

回到前面的方法

    /**
     * 尝试获取锁
     */
    public final void acquire(int arg) {
        //tryAcquire(1) 首先尝试获取一下锁
        //若成功则不需要进入等待队列了
        //1.1
        if (!tryAcquire(arg) &&
            //1.2
            // tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
            //addWaiter(Node.EXCLUSIVE) 1.2.1
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //1.3
            selfInterrupt();
    }

1.1tryAcquire返回false则继续执行后面的

1.2acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

1.2.1 addWaiter(Node.EXCLUSIVE)

/**
 * 此方法的作用是把线程包装成node,同时进入到队列中
 * 参数mode此时是Node.EXCLUSIVE,代表独占模式
 */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 以下几行代码想把当前node加到链表的最后面去,也就是进到阻塞队列的最后
        Node pred = tail;

        // tail!=null --> 队列不为空
        if (pred != null) { 
            // 设置自己的前驱 为当前的队尾节点
            node.prev = pred; 
            // 用CAS把自己设置为队尾, 如果成功后,tail == node了
            //1.2.1.1
            if (compareAndSetTail(pred, node)) { 
                // 进到这里说明设置成功,当前node==tail, 将自己与之前的队尾相连,
                // 上面已经有 node.prev = pred
                // 加上下面这句,也就实现了和之前的尾节点双向连接了
                pred.next = node;
                // 线程入队了,可以返回了
                return node;
            }
        }
        // 仔细看看上面的代码,如果会到这里,
        // 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
      	//1.2.1.2
        enq(node);
        return node;
    }

1.2.1.1 compareAndSetTail(pred, node)

/**
 * 使用CAS设置队列的Tail
 */
private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

1.2.1.2enq(node)

/**
 * 进入这个方法只有两种可能:1.等待队列为空 2.有线程竞争入队
 * 采用自旋的方式入队
 * CAS设置tail过程中,竞争一次竞争不到,多次竞争,总会排到的
 */
    private Node enq(final Node node) {
        //无限循环
        for (;;) {
            Node t = tail;
            // 如果队列是空的就去初始化
            if (t == null) { // Must initialize
                // CAS初始化head节点
                //1.2.1.2.1
                if (compareAndSetHead(new Node()))
                // 给后面用:这个时候head节点的waitStatus==0, 看new Node()构造方法就知道了
                // 这个时候有了head,但是tail还是null,设置一下,
                // 设置完了以后,继续for循环,下次就到下面的else分支了
                    tail = head;
            } else {
                // 下面几行,和上一个方法 addWaiter 是一样的,
                // 通过CAS将当前线程排到队尾,有线程竞争的话排不上重复排
                // 直到成功了才return 
                // 这里return后前面的addWaiter()方法也返回 
                // 接下来进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

1.2 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    /**
     * 参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列
     * 如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话
     * 意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false
     *
     * 这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {  //这里无线循环 直到下面的条件满足
                //获取当前节点的前一个节点 设置为p
                final Node p = node.predecessor();
                //p=head说明当前节点是队列的第一个 
                // 所以当前节点可以去试抢一下锁
                // enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
                // 也就是说,当前的head可能不属于任何一个线程,所以作为队头,可以去试一试,
                // tryAcquire已经分析过了,就是简单用CAS试操作一下state
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 到这里,说明上面的if分支没有成功
                //要么当前node本来就不是队头
                // 要么就是tryAcquire(arg)没有抢赢别人
                //1.2.2
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //1.2.3
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

1.2.2 shouldParkAfterFailedAcquire(p, node)

    /**
     * 进入这里说明抢到锁,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
     * 第一个参数是前驱节点,第二个参数代表当前线程的节点 这里一共有三个规则
     * 1.如果前继的节点状态为SIGNAL,表明当前节点需要unpark,则返回true 将导致线程阻塞
     * 2.如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则找到一个非取消的前驱节点        *   返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true
     * 3.如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL
     *  返回false后进入acquireQueued的无限循环,与规则2同
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;

        // 前驱节点 waitStatus大于0 ,说明前驱节点取消了排队。
        // 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
        // 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
        // 就是为当前节点找一个正常的前驱节点 毕竟当前节点需要等着前驱节点来唤醒
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            // 这里就在循环直到找到一个waitStatus 不大于 0的前驱节点
            do { 
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 仔细想想,如果进入到这个分支意味着什么
            // 前驱节点的waitStatus不等于-1也不大于0,那也就是只可能是0,-2,-3
            // 这里说明一下:每个新的node入队时,waitStatu都是0
            // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

1.2.3 parkAndCheckInterrupt()

   /**
     *这个方法很简单,因为前面返回true,所以需要挂起线程,这个方法就是负责挂起线程的
     *这里用了LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

3. 解锁过程

reentrantLock.unlock() 解锁的代码比较相比加锁的要简单不少

/**
 * 解锁
 */
public void unlock() {
    //1
    sync.release(1);
}

1. sync.release(1)

/**
 * 释放锁
 *
 */

public final boolean release(int arg) {
   	//1.1 
    //这里尝试释放锁如果成功则进入if里面
    if (tryRelease(arg)) {
        // h赋值为当前的head节点
        Node h = head;
        //如果head节点不是null
        //并且head节点的waitStatus不等于0 即head节点不是刚初始化的
        //因为刚初始化是waitStatus是等于0的
        if (h != null && h.waitStatus != 0)
            //1.2 
            //唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

1.1 tryRelease(1)

/**
 * 尝试释放锁
 */
protected final boolean tryRelease(int releases) {
    //可重入锁 所以state可以大于1 每次释放时state减1
    int c = getState() - releases;
    //如果当前线程不是拥有锁的线程直接抛出异常 这肯定嘛 都没获取到锁你释放什么
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否完全释放锁
    boolean free = false;
    // state==0了 说明可以完全释放锁了
    if (c == 0) {
        free = true;
        //把锁的拥有者设置为null
        setExclusiveOwnerThread(null);
    }
    //锁的状态设置为0 即没有被获取
    setState(c);
    //到这里 锁已经释放了 
    //回到上边的release(1)方法
    return free;
}

1.2 unparkSuccessor(h)

/**
 * Wakes up node's successor, if one exists.
 * 唤醒后继节点 如果有的话
 * @param node the node 参数node是head头结点
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    // 如果head节点当前waitStatus<0, 将其修改为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
    Node s = node.next;
  //如果直接后继节点是null或者 waitStatus > 0即取消了等待
  //那么就直接从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 从后往前找,不必担心中间有节点取消(waitStatus==1)的情况
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //如果直接后继节点不是空的就直接唤醒
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);
}

唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 刚刚线程被挂起在这里了
    return Thread.interrupted();
}
// 又回到这个方法了:acquireQueued(final Node node, int arg),这个时候,node的前驱是head了

4. 参考

https://javadoop.com/post/AbstractQueuedSynchronizer#toc0

https://blog.csdn.net/chen77716/article/details/6641477

https://www.cnblogs.com/waterystone/p/4920797.html