Java concurrency之公平锁(一)_动力节点Java学院整理
基本概念 本章,我们会讲解“线程获取公平锁”的原理;在讲解之前,需要了解几个基本概念。后面的内容,都是基于这些概念的;这些概念可能比较枯燥,但从这些概念中,能窥见“java锁”的一些架构,这对我们了解锁是有帮助的。 2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。 3. CLH队列 -- Craig,Landin,and Hagersten lock queue 4. CAS函数 -- Compare And Swap 本章是围绕“公平锁”如何获取锁而层次展开。“公平锁”涉及到的知识点比较多,但总的来说,不是特别难;如果读者能读懂AQS和ReentrantLock.java这两个类的大致意思,理解锁的原理和机制也就不成问题了。本章只是作者本人对锁的一点点理解,希望这部分知识能帮助您了解“公平锁”的获取过程,认识“锁”的框架。 ReentrantLock数据结构 ReentrantLock的UML类图 从图中可以看出: (01) ReentrantLock实现了Lock接口。 获取公平锁(基于JDK1.7.0_40) 通过前面“Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock”的“示例1”,我们知道,获取锁是通过lock()函数。下面,我们以lock()对获取公平锁的过程进行展开。 1. lock() lock()在ReentrantLock.java的FairSync类中实现,它的源码如下: final void lock() { acquire(1); } 说明:“当前线程”实际上是通过acquire(1)获取锁的。 这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取到了,它的状态值就变成了1。 2. acquire() acquire()在AQS中实现的,它的源码如下: public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE),arg)) selfInterrupt(); } (01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。 上面是对acquire()的概括性说明。下面,我们将该函数分为4部分来逐步解析。 一. tryAcquire() 1. tryAcquire() 公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下: protected final boolean tryAcquire(int acquires) { // 获取“当前线程” final Thread current = Thread.currentThread(); // 获取“独占锁”的状态 int c = getState(); // c=0意味着“锁没有被任何线程锁拥有”, if (c == 0) { // 若“锁没有被任何线程锁拥有”, // 则判断“当前线程”是不是CLH队列中的第一个线程线程, // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。 if (!hasQueuedPredecessors() && compareAndSetState(0,acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 如果“独占锁”的拥有者已经为“当前线程”, // 则将更新锁的状态。 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试! 2. hasQueuedPredecessors() hasQueuedPredecessors()在AQS中实现,源码如下: public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } 说明: 通过代码,能分析出,hasQueuedPredecessors() 是通过判断"当前线程"是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程。下面对head、tail和Node进行说明。 3. Node的源码 Node就是CLH队列的节点。Node在AQS中实现,它的数据结构如下: private transient volatile Node head; // CLH队列的队首 private transient volatile Node tail; // CLH队列的队尾 // CLH队列的节点 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // 线程已被取消,对应的waitStatus的值 static final int CANCELLED = 1; // “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。 // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。 static final int SIGNAL = -1; // 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值 static final int CONDITION = -2; // (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值 static final int PROPAGATE = -3; // waitStatus为“CANCELLED,SIGNAL,CONDITION,PROPAGATE”时分别表示不同状态, // 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。 volatile int waitStatus; // 前一节点 volatile Node prev; // 后一节点 volatile Node next; // 节点所对应的线程 volatile Thread thread; // nextWaiter是“区别当前CLH队列是 ‘独占锁'队列 还是 ‘共享锁'队列 的标记” // 若nextWaiter=SHARED,则CLH队列是“独占锁”队列; // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。 Node nextWaiter; // “共享锁”则返回true,“独占锁”则返回false。 final boolean isShared() { return nextWaiter == SHARED; } // 返回前一节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。 Node(Thread thread,Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。 Node(Thread thread,int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } 说明: 4. compareAndSetState() compareAndSetState()在AQS中实现。它的源码如下: protected final boolean compareAndSetState(int expect,int update) { return unsafe.compareAndSwapInt(this,stateOffset,expect,update); } 说明: compareAndSwapInt() 是sun.misc.Unsafe类中的一个本地方法。对此,我们需要了解的是 compareAndSetState(expect,update) 是以原子的方式操作当前线程;若当前线程的状态为expect,则设置它的状态为update。 5. setExclusiveOwnerThread() setExclusiveOwnerThread()在AbstractOwnableSynchronizer.java中实现,它的源码如下: // exclusiveOwnerThread是当前拥有“独占锁”的线程 private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread t) { exclusiveOwnerThread = t; } 说明:setExclusiveOwnerThread()的作用就是,设置线程t为当前拥有“独占锁”的线程。 6. getState(),setState() getState()和setState()都在AQS中实现,源码如下: // 锁的状态 private volatile int state; // 设置锁的状态 protected final void setState(int newState) { state = newState; } // 获取锁的状态 protected final int getState() { return state; } 说明:state表示锁的状态,对于“独占锁”而已,state=0表示锁是可获取状态(即,锁没有被任何线程锁持有)。由于java中的独占锁是可重入的,state的值可以>1。 小结:tryAcquire()的作用就是让“当前线程”尝试获取锁。获取成功返回true,失败则返回false。 二. addWaiter(Node.EXCLUSIVE) addWaiter(Node.EXCLUSIVE)的作用是,创建“当前线程”的Node节点,且Node中记录“当前线程”对应的锁是“独占锁”类型,并且将该节点添加到CLH队列的末尾。 1.addWaiter() addWaiter()在AQS中实现,源码如下: private Node addWaiter(Node mode) { // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。 Node node = new Node(Thread.currentThread(),mode); Node pred = tail; // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred,node)) { pred.next = node; return node; } } // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。 enq(node); return node; } 说明:对于“公平锁”而言,addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。 2. compareAndSetTail() compareAndSetTail()在AQS中实现,源码如下: private final boolean compareAndSetTail(Node expect,Node update) { return unsafe.compareAndSwapObject(this,tailOffset,update); } 说明:compareAndSetTail也属于CAS函数,也是通过“本地方法”实现的。compareAndSetTail(expect,update)会以原子的方式进行操作,它的作用是判断CLH队列的队尾是不是为expect,是的话,就将队尾设为update。 3. enq() enq()在AQS中实现,源码如下: private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t,node)) { t.next = node; return t; } } } } 说明: enq()的作用很简单。如果CLH队列为空,则新建一个CLH表头;然后将node添加到CLH末尾。否则,直接将node添加到CLH末尾。 小结:addWaiter()的作用,就是将当前线程添加到CLH队列中。这就意味着将当前线程添加到等待获取“锁”的等待线程队列中了。 三. acquireQueued() 前面,我们已经将当前线程添加到CLH队列中了。而acquireQueued()的作用就是逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。下面,我们看看acquireQueued()的具体流程。 1. acquireQueued() acquireQueued()在AQS中实现,源码如下: final boolean acquireQueued(final Node node,int arg) { boolean failed = true; try { // interrupted表示在CLH队列的调度中, // “当前线程”在休眠时,有没有被中断过。 boolean interrupted = false; for (;;) { // 获取上一个节点。 // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 说明:acquireQueued()的目的是从队列中获取锁。 2. shouldParkAfterFailedAcquire() shouldParkAfterFailedAcquire()在AQS中实现,源码如下: // 返回“当前线程是否应该阻塞” private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) { // 前继节点的状态 int ws = pred.waitStatus; // 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。 if (ws == Node.SIGNAL) return true; // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点'的前继节点”。 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。 compareAndSetWaitStatus(pred,ws,Node.SIGNAL); } return false; } 说明: (01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。 CANCELLED[1] -- 当前线程已被取消 (02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。 规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。 3. parkAndCheckInterrupt()) parkAndCheckInterrupt()在AQS中实现,源码如下: private final boolean parkAndCheckInterrupt() { // 通过LockSupport的park()阻塞“当前线程”。 LockSupport.park(this); // 返回线程的中断状态。 return Thread.interrupted(); } 说明:parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。 这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况: 补充:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用类似,是阻塞/唤醒。 4. 再次tryAcquire() 了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函数之后。我们接着分析acquireQueued()的for循环部分。 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } 说明: (01) 通过node.predecessor()获取前继节点。predecessor()就是返回node的前继节点,若对此有疑惑可以查看下面关于Node类的介绍。 (02) p == head && tryAcquire(arg) 小结:acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有并中断过。 四. selfInterrupt() selfInterrupt()是AQS中实现,源码如下: private static void selfInterrupt() { Thread.currentThread().interrupt(); } 说明:selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢? 在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”! 小结:selfInterrupt()的作用就是当前线程自己产生一个中断。 总结 再回过头看看acquire()函数,它最终的目的是获取锁! public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE),arg)) selfInterrupt(); } (01) 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |