加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java.util.concurrent包详细分析--转

发布时间:2020-12-14 06:22:08 所属栏目:Java 来源:网络整理
导读:原文地址:http://blog.csdn.net/windsunmoon/article/details/36903901 概述 .util.concurrent 包含许多线程安全、良好、高性能的并发构建块。不客气地说,创建java.util.concurrent 的目的就是要实现 Collection 框架对所执行的并发操作。通过提供一组可靠

原文地址:http://blog.csdn.net/windsunmoon/article/details/36903901

概述

.util.concurrent 包含许多线程安全、良好、高性能的并发构建块。不客气地说,创建java.util.concurrent 的目的就是要实现 Collection 框架对所执行的并发操作。通过提供一组可靠的、高性能并发构建块,开发人员可以提高并发类的线程安全、可伸缩性、性能、可读性和可靠性。

此包包含locks,concurrent,atomic 三个包。

Atomic:原子数据的构建。

Locks:基本的锁的实现,最重要的AQS框架和lockSupport

Concurrent:构建的一些高级的工具,如线程池,并发队列等。

其中都用到了CAS(compare-and-swap)操作。CAS 是一种低级别的、细粒度的技术,它允许多个线程更新一个内存位置,同时能够检测其他线程的冲突并进行恢复。它是许多高性能并发的基础。在 JDK 5.0 之前,Java 语言中用于协调线程之间的访问的惟一原语是同步,同步是更重量级和粗粒度的。公开 CAS 可以开发高度可伸缩的并发 Java 类。这些更改主要由 JDK 库类使用,而不是由开发人员使用。

CAS操作都封装在java 不公开的类库中,sun.misc.Unsafe。此类包含了对原子操作的封装,具体用本地代码实现。本地的C代码直接利用到了硬件上的原子操作。

Atomic原子数据

?这个包里面提供了一组原子变量类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。可以对基本数据、数组中的基本数据、对类中的基本数据进行操作。原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作。

?java.util.concurrent.atomic中的类可以分成4组:

标量类(Scalar):AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference

数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

更新器类:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater

复合变量类:AtomicMarkableReference,AtomicStampedReference

标量类

第一组AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference这四种基本类型用来处理布尔,整数,长整数,对象四种数据,其内部实现不是简单的使用synchronized,而是一个更为高效的方式CAS (compare and swap) + volatile和native方法,从而避免了synchronized的高开销,执行效率大为提升。

他们的实现都是依靠 真正的值为volatile 类型,通过Unsafe 包中的原子操作实现。最基础就是CAS,他是一切的基础。如下 。其中offset是 在内存中 value相对于基地址的偏移量。(它的获得也由Unsafe 本地代码获得)。关于加锁的原理见附录。

核心代码如下,其他都是在compareAndSet基础上构建的。

private?static?final?Unsafe?unsafe?=?Unsafe.getUnsafe();

private?volatile?int?value;??

public?final?int?get()?{??

return?value;??

public?final?void?set(int?newValue)?{??

public?final?boolean?compareAndSet(int?expect,?int?update)?{??

return?unsafe.compareAndSwapInt(this,?valueOffset,?expect,?update);??

void set()和void lazySet():set设置为给定值,直接修改原始值;lazySet延时设置变量值,这个等价于set()方法,但是由于字段是volatile类型的,因此次字段的修改会比普通字段(非volatile字段)有稍微的性能延时(尽管可以忽略),所以如果不是想立即读取设置的新值,允许在“后台”修改值,那么此方法就很有用。

getAndSet( )方法,利用compareAndSet循环自旋实现。

原子的将变量设定为新数据,同时返回先前的旧数据。

其本质是get( )操作,然后做set( )操作。尽管这2个操作都是atomic,但是他们合并在一起的时候,就不是atomic。在Java的源程序的级别上,如果不依赖synchronized的机制来完成这个工作,是不可能的。只有依靠native方法才可以。

Java代码??

public?final?int?getAndSet(int?newValue)?{??

for?(;;)?{??

int?current?=?get();??

if?(compareAndSet(current,?newValue))??

return?current;??

对于 AtomicInteger、AtomicLong还提供了一些特别的方法。贴别是如,

getAndAdd( ):以原子方式将给定值与当前值相加, 相当于线程安全的t=i;i+=delta;return t;操作。

以实现一些加法,减法原子操作。(注意 --i、++i不是原子操作,其中包含有3个操作步骤:第一步,读取i;第二步,加1或减1;第三步:写回内存)

数组类

第二组AtomicIntegerArray,AtomicLongArray还有AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供?volatile?访问语义方面也引人注目,这对于普通数组来说是不受支持的。

他们内部并不是像AtomicInteger一样维持一个valatile变量,而是全部由native方法实现,如下AtomicIntegerArray的实现片断:

Java代码??

private?static?final?Unsafe?unsafe?=?Unsafe.getUnsafe();??

private?static?final?int?base?=?unsafe.arrayBaseOffset(int[].class);??//数组基地址

private?static?final?int?scale?=?unsafe.arrayIndexScale(int[].class);??//数组元素占的大小精度

private?final?int[]?array;??

public?final?int?get(int?i)?{??

return?unsafe.getIntVolatile(array,?rawIndex(i));??

public?final?void?set(int?i,?int?newValue)?{??

12.??private longrawIndex(int i) {//获取具体某个元素的偏移量

13.?????????if (i <0 || i >= array.length)

14.?????????????thrownew IndexOutOfBoundsException("index " + i);

15.?????????return base+ (long) i * scale;

16.?}

更新器类

第三组AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater基于反射的实用工具,可以对指定类的指定?volatile?字段进行原子更新。API非常简单,但是也是有一些约束:

(1)字段必须是volatile类型的

(2)字段的描述类型(修饰符public/protected/default/private)是与调用者与操作对象字段的关系一致。也就是说 调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。

(3)只能是实例变量,不能是类变量,也就是说不能加static关键字。

(4)只能是可修改变量,不能使final变量,因为final的语义就是不可修改。实际上final的语义和volatile是有冲突的,这两个关键字不能同时存在。

(5)对于AtomicIntegerFieldUpdater?和AtomicLongFieldUpdater?只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater?。

复合变量类

防止ABA问题出现而构造的类。如什么是ABA问题呢,当某些流程在处理过程中是顺向的,也就是不允许重复处理的情况下,在某些情况下导致一个数据由A变成B,再中间可能经过0-N个环节后变成了A,此时A不允许再变成B了,因为此时的状态已经发生了改变,他们都是对atomicReference的进一步包装,AtomicMarkableReferenceAtomicStampedReference功能差不多,有点区别的是:它描述更加简单的是与否的关系,通常ABA问题只有两种状态,而AtomicStampedReference是多种状态,那么为什么还要有AtomicMarkableReference呢,因为它在处理是与否上面更加具有可读性。

Lcoks 锁

此包中实现的最基本的锁,阻塞线程的LockSupport。核心是AQS框架(AbstractQueuedSynchronizer),是J U C(util concurrent) 最复杂的一个类。

Lock 和Synchronized

J U C 中的Lock和synchronized具有同样的语义和功能。不同的是,synchronized 锁在退出块时自动释放。而Lock 需要手动释放,且Lock更加灵活。Syschronizd 是 java 语言层面的,是系统关键字;Lock则是java 1.5以来提供的一个类。

Synchronized 具有以下缺陷,它无法中断一个正在等候获得锁的线程;也无法通过投票得到锁,如果不想等下去,也就没法得到锁;同步还要求锁的释放只能在与获得锁所在的堆栈帧相同的堆栈帧中进行。

而Lock(如ReentrantLock?)除了与Synchronized 具有相同的语义外,还支持锁投票定时锁等候可中断锁等候(就是说在等待锁的过程中,可以被中断)的一些特性。

new?Thread(task1,?"aa");

new?Thread(task1,?"bb");

?????? b.interrupt();

LockSupport 和java内置锁

???在LockSupport出现之前,如果要block/unblock某个Thread,除了使用Java语言内置的monitor机制之外,只能通过Thread.suspend()和Thread.resume()。然而Thread.suspend()和Thread.resume()基本上不可用,除了可能导致死锁之外,它们还存在一个无法解决的竞争条件:如果在调用Thread.suspend()之前调用了Thread.resume(),那么该Thread.resume()调用没有任何效果。LockSupport最主要的作用,便是通过一个许可(permit)状态,解决了这个问题。LockSupport?只能阻塞当前线程,但是可以唤醒任意线程。

?????那么LockSupport和Java语言内置的monitor机制有什么区别呢?它们的语义是不同的。LockSupport是针对特定Thread来进行block/unblock操作的;wait()/notify()/notifyAll()是用来操作特定对象的等待集合的。为了防止知识生锈,在这里简单介绍一下Java语言内置的monitor机制(详见:?)。正如每个Object都有一个锁,每个Object也有一个等待集合(wait set),它有wait、notify、notifyAll和Thread.interrupt方法来操作。同时拥有锁和等待集合的实体,通常被成为监视器(monitor)。每个Object的等待集合是由JVM维护的。等待集合一直存放着那些因为调用对象的wait方法而被阻塞的线程。由于等待集合和锁之间的交互机制,只有获得目标对象的同步锁时,才可以调用它的wait、notify和notifyAll方法。这种要求通常无法靠编译来检查,如果条件不能满足,那么在运行的时候调用以上方法就会导致其抛出IllegalMonitorStateException。

??? wait() 方法被调用后,会执行如下操作:

??? notify()方法被调用后,会执行如下操作:

??? notifyAll()方法被调用后的操作和notify()类似,不同的只是等待集合中所有的线程(同时)都要执行那些操作。然而等待集合中的线程必须要在竞争到目标对象的同步锁之后,才能继续执行。

?在标准的Sun jdk 中,Locksupport的实现基于Unsafe,都是本地代码,的实现不全是本地代码。

一个线程调用park阻塞之后,如果被其他线程调用interrupt(),那么他它会响应中断,解除阻塞,但是不会抛出interruption?异常。这点在构造可中断获取锁的时候用到了。

AbstractQueuedSynchronizer

AQS框架是 J U C包的核心。是构建同步、锁、信号量和自定义锁的基础。也是构建高级工具的基础。

从上图可以看到,锁,信号量的实现内部都有两个内部类,都继承AQS。

由于AQS的构建上采用模板模式(Template mode),即 AQS定义一些框架,而它的实现延迟到子类。如tryAcquire()方法。由于这个模式,我们如果直接看AQS源码会比较抽象。所以从某个具体的实现切入简单易懂。这里选泽ReentrantLock,它和Synchronized具有同样的语义。

简单说来,AbstractQueuedSynchronizer会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全 部处于阻塞状态,经过调查线程的显式阻塞是通过调用LockSupport.park()完成,而LockSupport.park()则调用 sun.misc.Unsafe.park()本地方法,再进一步,HotSpot在中中通过调用pthread_mutex_lock函数把 线程交给系统内核进行阻塞。

ReentrantLock

从ReentrantLock(可重入锁)开始,分析AQS。首先需要知道这个锁和java 内置的同步Synchronized具有同样的语义。如下代码解释重入的意思

new?ReentrantLock();

public?void?test() {

out.print("I am test1");

public?void?test2() {

out.println("I am test1");

??? }

</td>

</tr></table>

重入的意思就是,如果已经获得了锁,如果执行期间还需要获得这个锁的话,会直接获得所,不会被阻塞,获得锁的次数加1;每执行一次unlock,持有锁的次数减1,当为0时释放锁。这点,Synchronized 具有同样语义。

查看源码,可以看到ReentrantLock 对Lock接口的实现,把所有的操作都委派给一个叫Sync的类,如下源码:

?????

其中Sync的定义如右图

所以这个Syc类是关键。而Sync 基础AQS。Sync又有两个子类,

final static class NonfairSync extends Sync?

final static class FairSync extends Sync?

</td>

</tr></table>

显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁。

先理一下Reentrant.lock()方法的调用过程(默认非公平锁):

这 些讨厌的Template模式导致很难直观的看到整个调用过程,其实通过上面调用过程及AbstractQueuedSynchronizer的注释可以发现,AbstractQueuedSynchronizer中抽象了绝大多数Lock的功能,而只把tryAcquire方法延迟到子类中实现。 tryAcquire方法的语义在于用具体子类判断请求线程是否可以获得锁,无论成功与否AbstractQueuedSynchronizer都将处理后面的流程。

NonfairSync 和 FairSync 不同的是执行lock时做的操作,如下为 NonfairSync 的操作,其中compareAndSetState(intexpect,int des) 为AQS的方法,设置同步状态,NonfairSync 通过修改同步状态获得锁,锁定不成功才执行acquire(1),此方法也在AQS中定义。而 FairSync.lock 直接执行acquire(1)。

final?void?lock() {

if?(compareAndSetState(0,1))

currentThread());

else

}

</td>

</tr></table>

AQS中的Acquire(int)方法调用子类中的tryAcquire(int)实现,这里正是模板模式。如下面的源码。自此已经进入到了AQS的实现。

public?final?void?acquire(int?arg) {

if?(!tryAcquire(arg) &&

EXCLUSIVE),arg))

selfInterrupt();

}

</td>

</tr></table>

其他方法的调用顺序类似,如unlock 调用AQS的release ,release 调用Sync的tryRelease()。

下面看NonfairSync.tryAcquire,它调用Sync.nonfairTryAcquire。以下为实现,首先获取同步状态c,o代表锁没有线程正在竞争锁。如果c=0,那么尝试用CAS操作获得锁;或者c!=0,但是锁被当前线程拥有,那么获得锁的次数增加 acquires 次,这就是重入的概念。以上两种情况都成功获得锁,返回真。如果不是以上两种情况,就没有获得锁,返回假。

final?boolean?nonfairTryAcquire(int?acquires) {

final?Thread current = Thread.currentThread();

int?c = getState();

if?(c == 0) {

if?(compareAndSetState(0,acquires)) {

return?true;

else?if?(current == getExclusiveOwnerThread()) {

int?nextc = c + acquires;

if?(nextc < 0)?// overflow

throw?new?Error("Maximum lock count exceeded");

return?true;

return?false;

??????? }

</td>

</tr></table>

如果没有获得锁,即NonfairSync.tryAcqiuer()返回假,那么可以看出 AQS.acquire 将执行acquireQueued(addWaiter(Node.EXCLUSIVE),arg);将此线程追加到等待队列的队尾。其中Node是AQS的一个内部类,他是等待队列节点的抽象。

private?Node addWaiter(Node mode) {

new?Node(Thread.currentThread(),mode);

if?(pred !=?null) {

if?(compareAndSetTail(pred,node)) {

return?node;

return?node;

}

</td>

</tr></table>

其中mode指的是模式,NULL 为独占,否则为共享锁。RetranLock为独占锁。首先把线程包装为一个节点。然后获取等待队列的尾,如果不为NULL的话(这说明有其他线程在待队列中行),就把初始化node的前驱为pred.( node.prev?= pred) 然后通过CAS操作把node 设置为新的队尾,如果成功则设置pred的后继为 node.至此 快速进队完成。

但是如果pred为null(此时没有线程在等待,一开始tail 就是null) ,或者CAS设置队尾失败。则需要执行下面的入队流程。?这里可能是整个阻塞队列的初始化过程。Tail 为null

private?Node enq(final?Node node) {

for?(;;) {

if?(t ==?null) {?// Must initialize

new?Node();?// Dummy header

if?(compareAndSetHead(h)) {

return?h;

else?{

if?(compareAndSetTail(t,node)) {

return?t;

??? }

</td>

</tr></table>

该方法就是循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter的目的就是通过CAS把当前现在追加到队尾,并返回包装后的Node实例。

把线程要包装为Node对象的主要原因,除了用Node构造供虚拟队列外,还用Node包装了各种线程状态,这些状态被精心设计为一些数字值:

SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程release或cancel时要重新这个后继线程(unpark)

CANCELLED(1):因为超时或中断,该线程已经被取消

CONDITION(-2):表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞。

PROPAGATE(-3):传播共享锁

0:0代表无状态

接下来执行acquireQueued(Node)方法。acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回。

final?boolean?acquireQueued(final?Node node,?int?arg) {

try?{

boolean?interrupted =?false;

for?(;;) {

final?Node p = node.predecessor();

if?(p ==?head?&& tryAcquire(arg)) {

null;?// help GC

return?interrupted;

if?(shouldParkAfterFailedAcquire(p,node) &&

true;

catch?(RuntimeException ex) {

throw?ex;

??? }

</td>

</tr></table>

以上的循环不会无限进行,因为接下来线程会被阻塞。这由parkAndCheckInterrupt()方法实现,但是它只有在shouldParkAfterFailedAcquire 方法返回 true 的时候后才会继续执行进而阻塞。所以看 shouldParkAfterFailedAcquire方法,从方法的名字看 意思是,当获取锁失败的时候是否应该阻塞。

private?static?boolean?shouldParkAfterFailedAcquire(Node pred,Node node) {

int?ws = pred.waitStatus;

if?(ws == Node.SIGNAL)

return?true;

if?(ws > 0) {

do?{

while?(pred.waitStatus?> 0);

else?{

compareAndSetWaitStatus(pred,ws,Node.SIGNAL);

return?false;

??? }

</td>

</tr></table>

此方法的作用是根据它的前驱节点决定本节点做什么样的操作。前面已经说过Node的节点的waitState 表示它个后继节点 需要做什么操作。这里就是对线程状态的检查,所有这个方法参数中有前驱节点。

检查原则在于:

规则1:如果前继的节点状态为SIGNAL,表明当前节点需要unpark,则返回成功,此时acquireQueued方法的第12行(parkAndCheckInterrupt)将导致线程阻塞

规则2:如果前继节点状态为CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回false,acquireQueued方法的无限循环将递归调用该方法,直至规则1返回true,导致线程阻塞

规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,返回false后进入acquireQueued的无限循环,与规则2同

总体看来,shouldParkAfterFailedAcquire就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于CANCELLED状态,则顺便删除这些节点重新构造队列。

至此,获取锁完毕。

请求锁不成功的线程会被挂起在acquireQueued方法的第12行,12行以后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程得到解锁,则执行第13行,即设置interrupted = true,之后又进入无限循环。

解锁的过程相对简单一些。

调用关系如下顺序 ReentrantLock.unlock()????AQS.release()? --Synx.tryRealse()

从无限循环的代码可以看出,并不是得到解锁的线程一定能获得锁,必须在第6行中调用tryAccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。此可以看到,把tryAcquire方法延迟到子类中实现的做法非常精妙并具有极强的可扩展性,令人叹为观止!当然精妙的不是这个Templae设计模式,而是Doug Lea对锁结构的精心布局。

public?void?unlock() {

}

</td>

</tr></table>
<p align="left">release的语义在于:如果可以释放锁,则唤醒队列第一个线程(Head.next)。release先调用tryRelease调用是否解锁成功,解锁成长才进行下一步操作。


<table border="0" cellspacing="0" cellpadding="0">
<tr>
<td valign="top">
<p align="left">public?final?boolean?release(int?arg) {


<p align="left">????????if?(tryRelease(arg)) {


<p align="left">??????????? Node h =?head;


<p align="left">????????????if?(h !=?null?&& h.waitStatus?!= 0)


<p align="left">??????????????? unparkSuccessor(h);


<p align="left">????????????return?true;


<p align="left">??????? }


<p align="left">????????return?false;

??? }

</td>

</tr>

</table>

tryRelease与tryAcquire语义相同,把如何释放的逻辑延迟到子类中。tryRelease语义很明确:如果线程多次锁定,则进行多次释放,直至status==0则真正释放锁,所谓释放锁即设置status为0,因为无竞争所以没有使用CAS。如下源代码

protected?final?boolean?tryRelease(int?releases) {

int?c = getState() - releases;

if?(Thread.currentThread() != getExclusiveOwnerThread())

throw?new?IllegalMonitorStateException();

boolean?free =?false;

if?(c == 0) {

true;

null);

return?free;

??????? }

</td>

</tr></table>

下面的源代码是唤醒队列的第一个线程。但是其可能被取消,当被取消的时候,从队尾往前找线程。(不从对头开始的原因是,队尾一直在变化,不容易判断)

private?void?unparkSuccessor(Node node) {

int?ws = node.waitStatus;

if?(ws < 0)

compareAndSetWaitStatus(node,0);

if?(s ==?null?|| s.waitStatus?> 0) {

null;

for?(Node t =?tail; t !=?null?&& t != node; t = t.prev)

if?(t.waitStatus?<= 0)

if?(s !=?null)

unpark(s.thread);

??? }

</td>

</tr></table>

可中断锁的实现:本质是调用 AQS. 他在响应中断后直接跳出循环,抛出异常,而正常额Lock 忽略这个中断,只是简单的记录下,然后继续循环。

private?void?doAcquireInterruptibly(int?arg)

throws?InterruptedException {

final?Node node = addWaiter(Node.EXCLUSIVE);

try?{

for?(;;) {

final?Node p = node.predecessor();

if?(p ==?head?&& tryAcquire(arg)) {

null;?// help GC

return;

if?(shouldParkAfterFailedAcquire(p,node) &&

break;

catch?(RuntimeException ex) {

throw?ex;

throw?new?InterruptedException();

??? }

</td>

</tr></table>

超时锁的实现基本类似,就是阻塞一段时间后自己恢复,如果有中断则抛出异常。

private?boolean?doAcquireNanos(int?arg,?long?nanosTimeout)

throws?InterruptedException {

long?lastTime = System.nanoTime();

final?Node node = addWaiter(Node.EXCLUSIVE);

try?{

for?(;;) {

final?Node p = node.predecessor();

if?(p ==?head?&& tryAcquire(arg)) {

null;?// help GC

return?true;

if?(nanosTimeout <= 0) {

return?false;

if?(nanosTimeout >?spinForTimeoutThreshold?&&

shouldParkAfterFailedAcquire(p,node))

parkNanos(this,nanosTimeout);

long?now = System.nanoTime();

if?(Thread.interrupted())

break;

catch?(RuntimeException ex) {

throw?ex;

throw?new?();

??? }

</td>

</tr></table>

Condition

Condition 实现了与java内容monitor 类似的功能。提供 await,signal,signalall 等操作,与object . wait等一系列操作对应。不同的是一个condition 可以有多个条件队列。这点内置monitor 是做不到的。另外还支持 超时、取消等更加灵活的方式。

和内置的Monitor一样,调用 Condition。aWait 等操作,需要获得锁,也就是 Condition 是和一个锁绑定在一起的。它的实现 是在AQS中,基本思想如下:一下内容抄自博客:

public final void await() throws InterruptedException {

??? // 1.如果当前线程被中断,则抛出中断异常

??? if (Thread.interrupted())

??????? throw newInterruptedException();

??? // 2.将节点加入到Condition队列中去,这里如果lastWaiter是cancel状态,那么会把它踢出Condition队列。

??? Node node = addConditionWaiter();

??? // 3.调用tryRelease,释放当前线程的锁

??? long savedState =fullyRelease(node);

??? int interruptMode = 0;

??? // 4.为什么会有在AQS的等待队列的判断?

??? // 解答:signal*作会将Node从Condition队列中拿出并且放入到等待队列中去,在不在AQS等待队列就看signal是否执行了

??? // 如果不在AQS等待队列中,就park当前线程,如果在,就退出循环,这个时候如果被中断,那么就退出循环

??? while (!isOnSyncQueue(node)) {

??????? LockSupport.park(this);

??????? if ((interruptMode =checkInterruptWhileWaiting(node)) != 0)

??????????? break;

??? }

??? // 5.这个时候线程已经被signal()或者signalAll()*作给唤醒了,退出了4中的while循环

??? // 自旋等待尝试再次获取锁,调用acquireQueued方法

??? if (acquireQueued(node,savedState) && interruptMode != THROW_IE)

??????? interruptMode = REINTERRUPT;

??? if (node.nextWaiter != null)

??????? unlinkCancelledWaiters();

??? if (interruptMode != 0)

??????? reportInterruptAfterWait(interruptMode);

}

整个await的过程如下:

  1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。进行2。

  2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。进行3。

  3.自旋(while)挂起,直到被唤醒或者超时或者CACELLED等。进行4。

  4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。

可以看到,这个await的*作过程和Object.wait()方法是一样,只不过await()采用了Condition队列的方式实现了Object.wait()的功能。

signal和signalAll方法

await*()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await*()中FIFO队列中第一个Node唤醒(或者全部Node)唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。

?Java Code

public final void signal() {

??? if (!isHeldExclusively())

??????? throw newIllegalMonitorStateException();

??? Node first = firstWaiter;

??? if (first != null)

??????? doSignal(first);

}

这里先判断当前线程是否持有锁,如果没有持有,则抛出异常,然后判断整个condition队列是否为空,不为空则调用doSignal方法来唤醒线程,看看doSignal方法都干了一些什么:

?Java Code

private void doSignal(Node first) {

??? do {

??????? if ( (firstWaiter =first.nextWaiter) == null)

??????????? lastWaiter = null;

??????? first.nextWaiter = null;

??? } while(!transferForSignal(first) &&

???????????? (first = firstWaiter)!= null);

}

上面的代*很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。

?Java Code

final boolean transferForSignal(Node node) {

??? /*

???? * 设置node的waitStatus:Condition->0

???? */

??? if(!compareAndSetWaitStatus(node,Node.CONDITION,0))

??????? return false;

??? /*

???? * 加入到AQS的等待队列,让节点继续获取锁

???? * 设置前置节点状态为SIGNAL

???? */

??? Node p = enq(node);

??? int c = p.waitStatus;

??? if (c > 0 ||!compareAndSetWaitStatus(p,c,Node.SIGNAL))

???????LockSupport.unpark(node.thread);

??? return true;

}

上面就是唤醒一个await*()线程的过程,根据前面的介绍,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入AQS的队列。所以可以看到在LockSupport.unpark之前调用了enq(node)*作,将当前节点加入到AQS队列。

signalAll和signal方法类似,主要的不同在于它不是调用doSignal方法,而是调用doSignalAll方法:

?Java Code

private void doSignalAll(Node first) {

??? lastWaiter = firstWaiter? = null;

??? do {

??????? Node next =first.nextWaiter;

??????? first.nextWaiter = null;

??????? transferForSignal(first);

??????? first = next;

??? } while (first != null);

}

这个方法就相当于把Condition队列中的所有Node全部取出插入到等待队列中去。

线程池

JUC 中提供了线程池的实现,其基于一系列的抽象和接口。接下里一步一步解开线程池的神秘面纱。

首先应该了解线程池的使用。J U C 提供了一个 构造线程池的 工厂类。java.util.concurrent.Executors 。此工厂提供了构造各种不同类型线程池的静态方法。如固定线程池,单一工作线程池,和缓存线程池等。

如下代码构造了一个具有2个固定工作线程的线程池。

ExecutorService ser = Executors.newFixedThreadPool(2);

</td>

</tr></table>

经过跟踪,此构造函数最终调用如下,其参数解释如下:

<table border="0" cellspacing="0" cellpadding="0">

<tr>
<td valign="top">
<p align="left">public?ThreadPoolExecutor(int?corePoolSize,


<p align="left">???????? ?????????????????????int?maximumPoolSize,


<p align="left">??????????????????????????????long?keepAliveTime,


<p align="left">????????????????????????????? TimeUnit unit,


<p align="left">????????????????????????????? BlockingQueue workQueue,


<p align="left">????????????????????????????? ThreadFactory threadFactory,


<p align="left">????????????????????????????? RejectedExecutionHandler handler) {


<p align="left">????????if?(corePoolSize < 0 ||


<p align="left">??????????? maximumPoolSize <= 0 ||


<p align="left">??????????? maximumPoolSize < corePoolSize ||


<p align="left">??????????? keepAliveTime < 0)


<p align="left">????????????throw?new?IllegalArgumentException();


<p align="left">????????if?(workQueue ==?null?|| threadFactory ==?null?|| handler ==?null)


<p align="left">????????????throw?new?NullPointerException();


<p align="left">????????this.corePoolSize?= corePoolSize;


<p align="left">????????this.maximumPoolSize?= maximumPoolSize;


<p align="left">????????this.workQueue?= workQueue;


<p align="left">????????this.keepAliveTime?= unit.toNanos(keepAliveTime);


<p align="left">????????this.threadFactory?= threadFactory;


<p align="left">????????this.handler?= handler;

??? }

</td>

</tr>

</table>

我们构造的线程池的类型是?ExecutorService,ThreadPoolExecutor继承AbstractExecutorService,其总体类图如下,可以看到最初的抽象是Exector。

接口Executor

该接口只有一个方法,JDK解释如下

执行已提交的Runnable?任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。

不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:

class DirectExecutor implements Executor {

???? public void execute(Runnable r) {

???????? r.run();

???? }

?}

</td>

</tr></table>

更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。

class ThreadPerTaskExecutor implements Executor {

???? public void execute(Runnable r) {

???????? new Thread(r).start();

???? }

?}

</td>

</tr></table>

方法介绍如下:

void execute(?command)

在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由Executor实现决定。

接口ExecutorService

ExecutorService 是对?Executor?的扩展,JDK文档解释如下:

此接口中的关键是三个submit 方法,接受一个任务,并返回结果Future。

Future submit(Callable task);

Future submit(Runnable task,T result);

Future submit(Runnable task);

其中Callable 就是带返回结果的Runnable。定义如下:

public?interface?Callable {

@return?computed result

@throws?Exception if unable to compute a result

throws?Exception;

}

精彩的是返回一个表示任务的未决结果的 Future。该 Future 的get?方法在成功完成时将会返回该任务的结果。注意这些过程是异步的。

接口Future

JDK解释如下:

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future 形式类型、并返回 null 作为底层任务的结果。

它的方法简介如下:

?boolean

cancel(boolean?mayInterruptIfRunning)???????????试图取消对此任务的执行。

</td>

</tr>
<tr>
<td valign="top">
<p align="right">?<a title="Future 中的类型参数" href="http://blog.csdn.net/windsunmoon/article/details/36903901" target="_blank">V

</td>
<td>

get()???????????如有必要,等待计算完成,然后获取其结果。

</td>

</tr>
<tr>
<td valign="top">
<p align="right">?<a title="Future 中的类型参数" href="http://blog.csdn.net/windsunmoon/article/details/36903901" target="_blank">V

</td>
<td>

get(long?timeout,??unit)???????????如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。

</td>

</tr>
<tr>
<td valign="top">
<p align="right">?boolean

</td>
<td>

isCancelled()???????????如果在任务正常完成前将其取消,则返回?true

</td>

</tr>
<tr>
<td valign="top">
<p align="right">?boolean

</td>
<td>

isDone()???????????如果任务已完成,则返回?true

</td>

</tr>

Submit后发生的事情

有了以上的一些基本了解,接下来看当任务提交之后发生的一系列过程。

Submit 的实际代码位于AbstractExecutorService,继承ExecutorService。来观察其三个submit方法。

构造RunnableFuture

public?Future submit(Runnable task) {

if?(task ==?null)?throw?new?NullPointerException();

ftask = newTaskFor(task,?null);

return?ftask;

public? Future submit(Runnable task,T result) {

if?(task ==?null)?throw?new?NullPointerException();

ftask = newTaskFor(task,result);

return?ftask;

public? Future submit(Callable task) {

if?(task ==?null)?throw?new?NullPointerException();

ftask = newTaskFor(task);

return?ftask;

??? }

</td>

</tr></table>

可以看出,不论submit 方法的参数是什么,都是先构造一个RunnableFuture ,然偶执行它,并返回它。执行和返回的都是RunnableFuture。所以RunnableFuture实现了future 接口和runnnable接口。注意这点的类型是RunnableFuture,所有接下来的execute方法执行的run方法是RunnableFuture?的具体实现类FutureTask的run方法。

来看RunnableFuture,其代码如下:

run?method causes completion of the?Future

@see?FutureTask

@see?Executor

@since?1.6

@author?Doug Lea

@param?The result type returned by this Future's?get?method

public?interface?RunnableFuture?extends?Runnable,Future {

void?run();

}

</td>

</tr></table>

作为??的?。成功执行 run 方法可以完成 Future 并允许访问其结果。以下代码可以看出 返回的实际上是FutureTask,为RunnableFuture的实现类。

protected? RunnableFuture newTaskFor(Runnable runnable,T value) {

return?new?FutureTask(runnable,value);}

</td>

</tr></table>

关于 FutureTask ?JDK对其介绍如下:

可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对?的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。

可使用FutureTask 包装??或??对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给??执行。

先看其构造函数。可以看出其构造函数主要是一个 同步器的构造。同步器接受一个Callable类型的参数。

public?FutureTask(Callable callable) {

if?(callable ==?null)

throw?new?NullPointerException();

new?Sync(callable);

public?FutureTask(Runnable runnable,V result) {

new?Sync(Executors.callable(runnable,result));

??? }

</td>

</tr></table>

对于参数是Runnable 类型时,经过转化为Callable 类型,转化代码如下,本质上就是在Callable 的call方法中调用Runnable的run方法:

public?static? Callable callable(Runnable task,T result) {

if?(task ==?null)

throw?new?NullPointerException();

return?new?RunnableAdapter(task,result);

}

static?final?class?RunnableAdapter?implements?Callable {

final?Runnable?task;

final?T?result;

this.task?= task;

this.result?= result;

public?T call() {

return?result;

??? }

</td>

</tr></table>

FutureTask的关键逻辑都由他的一个内部类Sync 实现。我们先暂且不管其具体实现,留在后面说。

执行

接下来看 执行任务。Execute 方法实现在ThreadPoolExecutor 类中,这是具体的线程池。其Execute 方法如下:

public?void?execute(Runnable command) {

if?(command ==?null)

throw?new?NullPointerException();

if?(poolSize?>=?corePoolSize?|| !addIfUnderCorePoolSize(command)) {

if?(runState?==?RUNNING?&&?workQueue.offer(command)) {

if?(runState?!=?RUNNING?||?poolSize?== 0)

else?if?(!addIfUnderMaximumPoolSize(command))//这里是再给一次机会

??? }

</td>

</tr></table>

具体的逻辑如下描述:

首先判断空;

如果当前池大小 小于 核心池大小(初始就是这样),那么会执行 addIfUnderCorePoolSize这个方法。这个方法就会创建新的工作线程,且把当前任务 command 设置为他的第一个任务,并开始执行,返回true。整个execute方法结束。(1)否则加入到等待队列中。(2)

执行情况1

先看情况1:如下代码,只有当前池大小小于核心池大小的时候,且线程池处于RUNNING状态的时候才增加新的工作线程,并把传进来的任务作为第一个任务并开始执行。此时返回真,否则返回假。

@param?firstTask the task the new thread should run first (or

@return?true if successful

private?boolean?addIfUnderCorePoolSize(Runnable firstTask) {

null;

final?ReentrantLock mainLock =?this.mainLock;

try?{

if?(poolSize?RUNNING)

finally?{

if?(t ==?null)

return?false;

return?true;

}

<p align="left">/*


<p align="left">????
Creates and returns a new thread running firstTask as its first


<p align="left">???? task. Call only while holding mainLock.


<p align="left">????


<p align="left">???? ?@param?firstTask the task the new thread should run first (or


<p align="left">????
null if none)


<p align="left">???? ?@return?the new thread,or null if threadFactory fails to create thread


<p align="left">????
/


<p align="left">????private?Thread addThread(Runnable firstTask) {


<p align="left">??????? Worker w =?new?Worker(firstTask);//工作线程,


<p align="left">??????? Thread t =?threadFactory.newThread(w);//封装成线程


<p align="left">????????if?(t !=?null) {


<p align="left">??????????? w.thread?= t;


<p align="left">????????????workers.add(w);


<p align="left">????????????int?nt = ++poolSize;


<p align="left">????????????if?(nt >?largestPoolSize)


<p align="left">????????????????largestPoolSize?= nt;


<p align="left">??????? }


<p align="left">????????return?t;

??? }

</td>

</tr></table>

执行情况2

如果当前池大小 大于核心池的大小,或者添加新的工作线程失败(这可能是多线程环境下,竞争锁,被阻塞,其他线程已经创建好了工作线程)。那么当前任务进入到等待队列。

如果队列满,或者线程池已经关闭,那么拒绝该任务。

工作线程worker

对工作线程的封装是类Worker,它实现了Runnable接口。addThread方法把Worker 组成线程(用threadFactory),并加入线程池,重新设置线程池 大小的 达到的最大值。

重点研究下worker的run方法,首先运行第一个任务,以后通过getTask()获取新的任务,如果得不到,工作线程会自动结束,在结束前 会执行一些工作,见后面。

public?void?run() {

try?{

null;

while?(task !=?null?|| (task = getTask()) !=?null) {

null;

finally?{

this);

}

</td>

</tr></table>

执行提交的任务,执行任务前 后可以 各进行 一些处理,目前默认实现是什么也不做,扩展的类可以实现它。

private?void?runTask(Runnable task) {

final?ReentrantLock runLock =?this.runLock;

try?{

if?(runState?STOP?&&

interrupted() &&

=?STOP)

boolean?ran =?false;

try?{

true;

null);

catch?(RuntimeException ex) {

if?(!ran)

throw?ex;

finally?{

??????? }

</td>

</tr></table>

下面的方法是 工作线程销毁钱调用的方法,是在run中调用的。当池大小为0的时候,调用tryterminate 方法。

@param?w the worker?此方法在ThreadPoolExecutor?中

void?workerDone(Worker w) {

final?ReentrantLock mainLock =?this.mainLock;

try?{

if?(--poolSize?== 0)

finally?{

??? }

</td>

</tr></table>

这个方法只有在线程池的状态是是stop 或者shutdown的时候才会真正的关闭整个线程池。另外shutdown也会调用这个方法。

private?void?tryTerminate() {

if?(poolSize?== 0) {

int?state =?runState;

if?(state STOP?&& !workQueue.isEmpty()) {

RUNNING;?// disable termination check below

null);

if?(t !=?null)

if?(state ==?STOP?|| state ==?SHUTDOWN) {

TERMINATED;

??? }

</td>

</tr></table>

FutureTask

此类是RunnableFuture的实现类。线程池执行的run方法是它的run方法。它委托给Sync实现,SYNC 继承AQS。

public?void?run() {

??? }

</td>

</tr></table>

重点看Sync。对具体任务的调用发生在innerSet(callable.call());这句调用,innerSet的方法 作用是 设置get方法的返回值。

void?innerRun() {

if?(!compareAndSetState(0,?RUNNING))

return;

try?{

currentThread();

if?(getState() ==?RUNNING)?// recheck after setting thread

else

catch?(Throwable ex) {

??????? }

</td>

</tr></table>

//设置后才释放锁。

void?innerSet(V v) {

for?(;;) {

int?s = getState();

if?(s ==?RAN)

return;

if?(s ==?CANCELLED) {

return;

if?(compareAndSetState(s,?RAN)) {

return;

??????? }

</td>

</tr></table>

而get方法是需要获取锁的,所以在具体的任务没有执行完前,调用get方法会进入到阻塞状态。

throws?InterruptedException,ExecutionException {

if?(getState() ==?CANCELLED)

throw?new?CancellationException();

if?(exception?!=?null)

throw?new?ExecutionException(exception);

return?result;

??????? }

</td>

</tr></table>

参考

框架介绍,比较广泛

原子类

锁的原理

此人是淘宝大神,原子操作的实现

?? java线程阻塞中断和LockSupport的常见问题

??????????????????? ??对比synchronized与java.util.concurrent.locks.Lock的异同

AQS

? 比较清晰的解释AQS的实现。? 这点给我的启示是,看源代码的时候,如果碰到 抽象类,那么跟它的实现类 结合一起看,搞清楚调用关系(这里肯定是模板模式,调用关系单单看抽象类看不明白)

java内置的锁机制

? Inside AbstractQueuedSynchronizer 系列,写的非常精彩

? 有关condition 的讲解。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!