加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

java – 从CompletableFuture调用ExecutorService.shutdownNow

发布时间:2020-12-15 08:27:10 所属栏目:Java 来源:网络整理
导读:当已经运行的任务之一抛出异常时,我需要取消所有已调度但尚未运行的CompletableFuture任务. 尝试以下示例,但大多数情况下main方法不会退出(可能是由于某种类型的死锁). public static void main(String[] args) { ExecutorService executionService = Execut
当已经运行的任务之一抛出异常时,我需要取消所有已调度但尚未运行的CompletableFuture任务.

尝试以下示例,但大多数情况下main方法不会退出(可能是由于某种类型的死锁).

public static void main(String[] args) {
    ExecutorService executionService = Executors.newFixedThreadPool(5);

    Set< CompletableFuture<?> > tasks = new HashSet<>();

    for (int i = 0; i < 1000; i++) {
        final int id = i;
        CompletableFuture<?> c = CompletableFuture

        .runAsync( () -> {
            System.out.println("Running: " + id); 
            if ( id == 400 ) throw new RuntimeException("Exception from: " + id);
        },executionService )

        .whenComplete( (v,ex) -> { 
            if ( ex != null ) {
                System.out.println("Shutting down.");
                executionService.shutdownNow();
                System.out.println("shutdown.");
            }
        } );

        tasks.add(c);
    }

    try{ 
        CompletableFuture.allOf( tasks.stream().toArray(CompletableFuture[]::new) ).join(); 
    }catch(Exception e) { 
        System.out.println("Got async exception: " + e); 
    }finally { 
        System.out.println("DONE"); 
    }        
}

上次打印输出是这样的:

Running: 402
Running: 400
Running: 408
Running: 407
Running: 406
Running: 405
Running: 411
Shutting down.
Running: 410
Running: 409
Running: 413
Running: 412
shutdown.

尝试在单独的线程上运行shutdownNow方法,但它仍然在大多数情况下给出相同的死锁.

知道什么可能导致这种僵局吗?

您认为在抛出异常时取消所有已安排但尚未运行的CompletableFutures的最佳方法是什么?

正在考虑迭代任务并在每个CompletableFuture上调用cancel.但我不喜欢这个是从连接抛出CancellationException.

解决方法

你应该记住这一点

CompletableFuture<?> f = CompletableFuture.runAsync(runnable,executionService);

基本上相当于

CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
    if(!f.isDone()) {
        try {
            runnable.run();
            f.complete(null);
        }
        catch(Throwable t) {
            f.completeExceptionally(t);
        }
    }
});

因此,ExecutorService对CompletableFuture一无所知,因此,它通常无法取消它.它所拥有的只是一些工作,表现为Runnable的一个实现.

换句话说,shutdownNow()将阻止执行挂起的作业,剩余的期货将无法正常完成,但不会取消它们.然后,你对allOf返回的未来调用join(),由于未完成的期货,它将永远不会返回.

但请注意,预定的工作确实在做任何昂贵的事情之前检查未来是否已经完成.

因此,如果您将代码更改为

ExecutorService executionService = Executors.newFixedThreadPool(5);
Set<CompletableFuture<?>> tasks = ConcurrentHashMap.newKeySet();
AtomicBoolean canceled = new AtomicBoolean();

for(int i = 0; i < 1000; i++) {
    final int id = i;
    CompletableFuture<?> c = CompletableFuture
        .runAsync(() -> {
            System.out.println("Running: " + id); 
            if(id == 400) throw new RuntimeException("Exception from: " + id);
        },executionService);
        c.whenComplete((v,ex) -> {
            if(ex != null && canceled.compareAndSet(false,true)) {
                System.out.println("Canceling.");
                for(CompletableFuture<?> f: tasks) f.cancel(false);
                System.out.println("Canceled.");
            }
        });
    tasks.add(c);
    if(canceled.get()) {
        c.cancel(false);
        break;
    }
}

try {
    CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
} catch(Exception e) {
    System.out.println("Got async exception: " + e);
} finally {
    System.out.println("DONE");
}
executionService.shutdown();

一旦相关的未来被取消,runnable将不会被执行.由于取消和普通执行之间存在竞争,因此将操作更改为可能会有所帮助

.runAsync(() -> {
    System.out.println("Running: " + id); 
    if(id == 400) throw new RuntimeException("Exception from: " + id);
    LockSupport.parkNanos(1000);
},executionService);

模拟一些实际工作量.然后,您将看到在遇到异常后执行的操作较少.

由于异步异常甚至可能在提交循环仍在运行时发生,因此它使用AtomicBoolean来检测这种情况并在这种情况下停止循环.

请注意,对于CompletableFuture,取消和任何其他特殊完成之间没有区别.调用f.cancel(…)等同于f.completeExceptionally(new CancellationException()).因此,由于CompletableFuture.allOf在异常情况下报告任何异常,因此很可能是CancellationException而不是触发异常.

如果用完全(null)替换两个cancel(false)调用,则会得到类似的效果,runnables将不会对已经完成的期货执行,但allOf将报告原始异常,因为它是唯一的例外.它还有另一个积极的作用:使用null值完成比构造CancellationException(对于每个未来的未来)便宜得多,因此通过complete(null)强制完成的运行速度要快得多,从而阻止了更多的未来执行.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读