private?boolean?doAcquireNanos(int?arg,?long?nanosTimeout)
throws?InterruptedException {
long?lastTime = System.nanoTime();
final?Node node = addWaiter(Node.EXCLUSIVE);
try?{
for?(;;) {
final?Node p = node.predecessor();
if?(p ==?head?&& tryAcquire(arg)) {
null;?// help GC
return?true;
if?(nanosTimeout <= 0) {
return?false;
if?(nanosTimeout >?spinForTimeoutThreshold?&&
shouldParkAfterFailedAcquire(p,node))
parkNanos(this,nanosTimeout);
long?now = System.nanoTime();
if?(Thread.interrupted())
break;
catch?(RuntimeException ex) {
throw?ex;
throw?new?();
??? }
</td>
</tr></table>
Condition
Condition 实现了与java内容monitor 类似的功能。提供 await,signal,signalall 等操作,与object . wait等一系列操作对应。不同的是一个condition 可以有多个条件队列。这点内置monitor 是做不到的。另外还支持 超时、取消等更加灵活的方式。
和内置的Monitor一样,调用 Condition。aWait 等操作,需要获得锁,也就是 Condition 是和一个锁绑定在一起的。它的实现 是在AQS中,基本思想如下:一下内容抄自博客:
public final void await() throws InterruptedException {
??? // 1.如果当前线程被中断,则抛出中断异常
??? if (Thread.interrupted())
??????? throw newInterruptedException();
??? // 2.将节点加入到Condition队列中去,这里如果lastWaiter是cancel状态,那么会把它踢出Condition队列。
??? Node node = addConditionWaiter();
??? // 3.调用tryRelease,释放当前线程的锁
??? long savedState =fullyRelease(node);
??? int interruptMode = 0;
??? // 4.为什么会有在AQS的等待队列的判断?
??? // 解答:signal*作会将Node从Condition队列中拿出并且放入到等待队列中去,在不在AQS等待队列就看signal是否执行了
??? // 如果不在AQS等待队列中,就park当前线程,如果在,就退出循环,这个时候如果被中断,那么就退出循环
??? while (!isOnSyncQueue(node)) {
??????? LockSupport.park(this);
??????? if ((interruptMode =checkInterruptWhileWaiting(node)) != 0)
??????????? break;
??? }
??? // 5.这个时候线程已经被signal()或者signalAll()*作给唤醒了,退出了4中的while循环
??? // 自旋等待尝试再次获取锁,调用acquireQueued方法
??? if (acquireQueued(node,savedState) && interruptMode != THROW_IE)
??????? interruptMode = REINTERRUPT;
??? if (node.nextWaiter != null)
??????? unlinkCancelledWaiters();
??? if (interruptMode != 0)
??????? reportInterruptAfterWait(interruptMode);
}
整个await的过程如下:
1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。进行2。
2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。进行3。
3.自旋(while)挂起,直到被唤醒或者超时或者CACELLED等。进行4。
4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。
可以看到,这个await的*作过程和Object.wait()方法是一样,只不过await()采用了Condition队列的方式实现了Object.wait()的功能。
signal和signalAll方法
await*()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await*()中FIFO队列中第一个Node唤醒(或者全部Node)唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。
?Java Code
public final void signal() {
??? if (!isHeldExclusively())
??????? throw newIllegalMonitorStateException();
??? Node first = firstWaiter;
??? if (first != null)
??????? doSignal(first);
}
这里先判断当前线程是否持有锁,如果没有持有,则抛出异常,然后判断整个condition队列是否为空,不为空则调用doSignal方法来唤醒线程,看看doSignal方法都干了一些什么:
?Java Code
private void doSignal(Node first) {
??? do {
??????? if ( (firstWaiter =first.nextWaiter) == null)
??????????? lastWaiter = null;
??????? first.nextWaiter = null;
??? } while(!transferForSignal(first) &&
???????????? (first = firstWaiter)!= null);
}
上面的代*很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。
?Java Code
final boolean transferForSignal(Node node) {
??? /*
???? * 设置node的waitStatus:Condition->0
???? */
??? if(!compareAndSetWaitStatus(node,Node.CONDITION,0))
??????? return false;
??? /*
???? * 加入到AQS的等待队列,让节点继续获取锁
???? * 设置前置节点状态为SIGNAL
???? */
??? Node p = enq(node);
??? int c = p.waitStatus;
??? if (c > 0 ||!compareAndSetWaitStatus(p,c,Node.SIGNAL))
???????LockSupport.unpark(node.thread);
??? return true;
}
上面就是唤醒一个await*()线程的过程,根据前面的介绍,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入AQS的队列。所以可以看到在LockSupport.unpark之前调用了enq(node)*作,将当前节点加入到AQS队列。
signalAll和signal方法类似,主要的不同在于它不是调用doSignal方法,而是调用doSignalAll方法:
?Java Code
private void doSignalAll(Node first) {
??? lastWaiter = firstWaiter? = null;
??? do {
??????? Node next =first.nextWaiter;
??????? first.nextWaiter = null;
??????? transferForSignal(first);
??????? first = next;
??? } while (first != null);
}
这个方法就相当于把Condition队列中的所有Node全部取出插入到等待队列中去。
线程池
JUC 中提供了线程池的实现,其基于一系列的抽象和接口。接下里一步一步解开线程池的神秘面纱。
首先应该了解线程池的使用。J U C 提供了一个 构造线程池的 工厂类。java.util.concurrent.Executors 。此工厂提供了构造各种不同类型线程池的静态方法。如固定线程池,单一工作线程池,和缓存线程池等。
如下代码构造了一个具有2个固定工作线程的线程池。
ExecutorService ser = Executors.newFixedThreadPool(2);
</td>
</tr></table>
经过跟踪,此构造函数最终调用如下,其参数解释如下:
<table border="0" cellspacing="0" cellpadding="0">
<tr>
<td valign="top">
<p align="left">public?ThreadPoolExecutor(int?corePoolSize,
<p align="left">???????? ?????????????????????int?maximumPoolSize,
<p align="left">??????????????????????????????long?keepAliveTime,
<p align="left">????????????????????????????? TimeUnit unit,
<p align="left">????????????????????????????? BlockingQueue workQueue,
<p align="left">????????????????????????????? ThreadFactory threadFactory,
<p align="left">????????????????????????????? RejectedExecutionHandler handler) {
<p align="left">????????if?(corePoolSize < 0 ||
<p align="left">??????????? maximumPoolSize <= 0 ||
<p align="left">??????????? maximumPoolSize < corePoolSize ||
<p align="left">??????????? keepAliveTime < 0)
<p align="left">????????????throw?new?IllegalArgumentException();
<p align="left">????????if?(workQueue ==?null?|| threadFactory ==?null?|| handler ==?null)
<p align="left">????????????throw?new?NullPointerException();
<p align="left">????????this.corePoolSize?= corePoolSize;
<p align="left">????????this.maximumPoolSize?= maximumPoolSize;
<p align="left">????????this.workQueue?= workQueue;
<p align="left">????????this.keepAliveTime?= unit.toNanos(keepAliveTime);
<p align="left">????????this.threadFactory?= threadFactory;
<p align="left">????????this.handler?= handler;
??? }
</td>
</tr>
</table>
我们构造的线程池的类型是?ExecutorService,ThreadPoolExecutor继承AbstractExecutorService,其总体类图如下,可以看到最初的抽象是Exector。
接口Executor
该接口只有一个方法,JDK解释如下
执行已提交的Runnable?任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。
不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:
class DirectExecutor implements Executor {
???? public void execute(Runnable r) {
???????? r.run();
???? }
?}
</td>
</tr></table>
更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor {
???? public void execute(Runnable r) {
???????? new Thread(r).start();
???? }
?}
</td>
</tr></table>
方法介绍如下:
void execute(?command)
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,这由Executor实现决定。
接口ExecutorService
ExecutorService 是对?Executor?的扩展,JDK文档解释如下:
此接口中的关键是三个submit 方法,接受一个任务,并返回结果Future。
Future submit(Callable task);
Future submit(Runnable task,T result);
Future> submit(Runnable task);
其中Callable 就是带返回结果的Runnable。定义如下:
public?interface?Callable {
@return?computed result
@throws?Exception if unable to compute a result
throws?Exception;
}
精彩的是返回一个表示任务的未决结果的 Future。该 Future 的get?方法在成功完成时将会返回该任务的结果。注意这些过程是异步的。
接口Future
JDK解释如下:
Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future> 形式类型、并返回 null 作为底层任务的结果。
它的方法简介如下:
?boolean
cancel(boolean?mayInterruptIfRunning)
???????????试图取消对此任务的执行。
</td>
</tr>
<tr>
<td valign="top">
<p align="right">?
<a title="Future 中的类型参数" href="http://blog.csdn.net/windsunmoon/article/details/36903901" target="_blank">V
</td>
<td>
get()
???????????如有必要,等待计算完成,然后获取其结果。
</td>
</tr>
<tr>
<td valign="top">
<p align="right">?
<a title="Future 中的类型参数" href="http://blog.csdn.net/windsunmoon/article/details/36903901" target="_blank">V
</td>
<td>
get(long?timeout,?
?unit)
???????????如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
</td>
</tr>
<tr>
<td valign="top">
<p align="right">?boolean
</td>
<td>
isCancelled()
???????????如果在任务正常完成前将其取消,则返回?true。
</td>
</tr>
<tr>
<td valign="top">
<p align="right">?boolean
</td>
<td>
isDone()
???????????如果任务已完成,则返回?true。
</td>
</tr>
Submit后发生的事情
有了以上的一些基本了解,接下来看当任务提交之后发生的一系列过程。
Submit 的实际代码位于AbstractExecutorService,继承ExecutorService。来观察其三个submit方法。
构造RunnableFuture
public?Future> submit(Runnable task) {
if?(task ==?null)?throw?new?NullPointerException();
ftask = newTaskFor(task,?null);
return?ftask;
public? Future submit(Runnable task,T result) {
if?(task ==?null)?throw?new?NullPointerException();
ftask = newTaskFor(task,result);
return?ftask;
public? Future submit(Callable task) {
if?(task ==?null)?throw?new?NullPointerException();
ftask = newTaskFor(task);
return?ftask;
??? }
</td>
</tr></table>
可以看出,不论submit 方法的参数是什么,都是先构造一个RunnableFuture ,然偶执行它,并返回它。执行和返回的都是RunnableFuture。所以RunnableFuture实现了future 接口和runnnable接口。注意这点的类型是RunnableFuture,所有接下来的execute方法执行的run方法是RunnableFuture?的具体实现类FutureTask的run方法。
来看RunnableFuture,其代码如下:
run?method causes completion of the?Future
@see?FutureTask
@see?Executor
@since?1.6
@author?Doug Lea
@param?The result type returned by this Future's?get?method
public?interface?RunnableFuture?extends?Runnable,Future {
void?run();
}
</td>
</tr></table>
作为??的?。成功执行 run 方法可以完成 Future 并允许访问其结果。以下代码可以看出 返回的实际上是FutureTask,为RunnableFuture的实现类。
protected? RunnableFuture newTaskFor(Runnable runnable,T value) {
return?new?FutureTask(runnable,value);}
</td>
</tr></table>
关于 FutureTask ?JDK对其介绍如下:
可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对?的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。
可使用FutureTask 包装??或??对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给??执行。
先看其构造函数。可以看出其构造函数主要是一个 同步器的构造。同步器接受一个Callable类型的参数。
public?FutureTask(Callable callable) {
if?(callable ==?null)
throw?new?NullPointerException();
new?Sync(callable);
public?FutureTask(Runnable runnable,V result) {
new?Sync(Executors.callable(runnable,result));
??? }
</td>
</tr></table>
对于参数是Runnable 类型时,经过转化为Callable 类型,转化代码如下,本质上就是在Callable 的call方法中调用Runnable的run方法:
public?static? Callable callable(Runnable task,T result) {
if?(task ==?null)
throw?new?NullPointerException();
return?new?RunnableAdapter(task,result);
}
static?final?class?RunnableAdapter?implements?Callable {
final?Runnable?task;
final?T?result;
this.task?= task;
this.result?= result;
public?T call() {
return?result;
??? }
</td>
</tr></table>
FutureTask的关键逻辑都由他的一个内部类Sync 实现。我们先暂且不管其具体实现,留在后面说。
执行
接下来看 执行任务。Execute 方法实现在ThreadPoolExecutor 类中,这是具体的线程池。其Execute 方法如下:
public?void?execute(Runnable command) {
if?(command ==?null)
throw?new?NullPointerException();
if?(poolSize?>=?corePoolSize?|| !addIfUnderCorePoolSize(command)) {
if?(runState?==?RUNNING?&&?workQueue.offer(command)) {
if?(runState?!=?RUNNING?||?poolSize?== 0)
else?if?(!addIfUnderMaximumPoolSize(command))//这里是再给一次机会
??? }
</td>
</tr></table>
具体的逻辑如下描述:
首先判断空;
如果当前池大小 小于 核心池大小(初始就是这样),那么会执行 addIfUnderCorePoolSize这个方法。这个方法就会创建新的工作线程,且把当前任务 command 设置为他的第一个任务,并开始执行,返回true。整个execute方法结束。(1)否则加入到等待队列中。(2)
执行情况1
先看情况1:如下代码,只有当前池大小小于核心池大小的时候,且线程池处于RUNNING状态的时候才增加新的工作线程,并把传进来的任务作为第一个任务并开始执行。此时返回真,否则返回假。
@param?firstTask the task the new thread should run first (or
@return?true if successful
private?boolean?addIfUnderCorePoolSize(Runnable firstTask) {
null;
final?ReentrantLock mainLock =?this.mainLock;
try?{
if?(poolSize?RUNNING)
finally?{
if?(t ==?null)
return?false;
return?true;
}
<p align="left">/*
<p align="left">???? Creates and returns a new thread running firstTask as its first
<p align="left">???? task. Call only while holding mainLock.
<p align="left">????
<p align="left">???? ?@param?firstTask the task the new thread should run first (or
<p align="left">???? null if none)
<p align="left">???? ?@return?the new thread,or null if threadFactory fails to create thread
<p align="left">???? /
<p align="left">????private?Thread addThread(Runnable firstTask) {
<p align="left">??????? Worker w =?new?Worker(firstTask);//工作线程,
<p align="left">??????? Thread t =?threadFactory.newThread(w);//封装成线程
<p align="left">????????if?(t !=?null) {
<p align="left">??????????? w.thread?= t;
<p align="left">????????????workers.add(w);
<p align="left">????????????int?nt = ++poolSize;
<p align="left">????????????if?(nt >?largestPoolSize)
<p align="left">????????????????largestPoolSize?= nt;
<p align="left">??????? }
<p align="left">????????return?t;
??? }
</td>
</tr></table>
执行情况2
如果当前池大小 大于核心池的大小,或者添加新的工作线程失败(这可能是多线程环境下,竞争锁,被阻塞,其他线程已经创建好了工作线程)。那么当前任务进入到等待队列。
如果队列满,或者线程池已经关闭,那么拒绝该任务。
工作线程worker
对工作线程的封装是类Worker,它实现了Runnable接口。addThread方法把Worker 组成线程(用threadFactory),并加入线程池,重新设置线程池 大小的 达到的最大值。
重点研究下worker的run方法,首先运行第一个任务,以后通过getTask()获取新的任务,如果得不到,工作线程会自动结束,在结束前 会执行一些工作,见后面。
public?void?run() {
try?{
null;
while?(task !=?null?|| (task = getTask()) !=?null) {
null;
finally?{
this);
}
</td>
</tr></table>
执行提交的任务,执行任务前 后可以 各进行 一些处理,目前默认实现是什么也不做,扩展的类可以实现它。
private?void?runTask(Runnable task) {
final?ReentrantLock runLock =?this.runLock;
try?{
if?(runState?STOP?&&
interrupted() &&
=?STOP)
boolean?ran =?false;
try?{
true;
null);
catch?(RuntimeException ex) {
if?(!ran)
throw?ex;
finally?{
??????? }
</td>
</tr></table>
下面的方法是 工作线程销毁钱调用的方法,是在run中调用的。当池大小为0的时候,调用tryterminate 方法。
@param?w the worker?此方法在ThreadPoolExecutor?中
void?workerDone(Worker w) {
final?ReentrantLock mainLock =?this.mainLock;
try?{
if?(--poolSize?== 0)
finally?{
??? }
</td>
</tr></table>
这个方法只有在线程池的状态是是stop 或者shutdown的时候才会真正的关闭整个线程池。另外shutdown也会调用这个方法。
private?void?tryTerminate() {
if?(poolSize?== 0) {
int?state =?runState;
if?(state STOP?&& !workQueue.isEmpty()) {
RUNNING;?// disable termination check below
null);
if?(t !=?null)
if?(state ==?STOP?|| state ==?SHUTDOWN) {
TERMINATED;
??? }
</td>
</tr></table>
FutureTask
此类是RunnableFuture的实现类。线程池执行的run方法是它的run方法。它委托给Sync实现,SYNC 继承AQS。
public?void?run() {
??? }
</td>
</tr></table>
重点看Sync。对具体任务的调用发生在innerSet(callable.call());这句调用,innerSet的方法 作用是 设置get方法的返回值。
void?innerRun() {
if?(!compareAndSetState(0,?RUNNING))
return;
try?{
currentThread();
if?(getState() ==?RUNNING)?// recheck after setting thread
else
catch?(Throwable ex) {
??????? }
</td>
</tr></table>
//设置后才释放锁。
void?innerSet(V v) {
for?(;;) {
int?s = getState();
if?(s ==?RAN)
return;
if?(s ==?CANCELLED) {
return;
if?(compareAndSetState(s,?RAN)) {
return;
??????? }
</td>
</tr></table>
而get方法是需要获取锁的,所以在具体的任务没有执行完前,调用get方法会进入到阻塞状态。
throws?InterruptedException,ExecutionException {
if?(getState() ==?CANCELLED)
throw?new?CancellationException();
if?(exception?!=?null)
throw?new?ExecutionException(exception);
return?result;
??????? }
</td>
</tr></table>
参考
框架介绍,比较广泛
原子类
锁的原理
此人是淘宝大神,原子操作的实现
?? java线程阻塞中断和LockSupport的常见问题
??????????????????? ??对比synchronized与java.util.concurrent.locks.Lock的异同
AQS
? 比较清晰的解释AQS的实现。? 这点给我的启示是,看源代码的时候,如果碰到 抽象类,那么跟它的实现类 结合一起看,搞清楚调用关系(这里肯定是模板模式,调用关系单单看抽象类看不明白)
java内置的锁机制
? Inside AbstractQueuedSynchronizer 系列,写的非常精彩
? 有关condition 的讲解。
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!