java – 从CompletableFuture调用ExecutorService.shutdownNow
当已经运行的任务之一抛出异常时,我需要取消所有已调度但尚未运行的CompletableFuture任务.
尝试以下示例,但大多数情况下main方法不会退出(可能是由于某种类型的死锁). public static void main(String[] args) { ExecutorService executionService = Executors.newFixedThreadPool(5); Set< CompletableFuture<?> > tasks = new HashSet<>(); for (int i = 0; i < 1000; i++) { final int id = i; CompletableFuture<?> c = CompletableFuture .runAsync( () -> { System.out.println("Running: " + id); if ( id == 400 ) throw new RuntimeException("Exception from: " + id); },executionService ) .whenComplete( (v,ex) -> { if ( ex != null ) { System.out.println("Shutting down."); executionService.shutdownNow(); System.out.println("shutdown."); } } ); tasks.add(c); } try{ CompletableFuture.allOf( tasks.stream().toArray(CompletableFuture[]::new) ).join(); }catch(Exception e) { System.out.println("Got async exception: " + e); }finally { System.out.println("DONE"); } } 上次打印输出是这样的: Running: 402 Running: 400 Running: 408 Running: 407 Running: 406 Running: 405 Running: 411 Shutting down. Running: 410 Running: 409 Running: 413 Running: 412 shutdown. 尝试在单独的线程上运行shutdownNow方法,但它仍然在大多数情况下给出相同的死锁. 知道什么可能导致这种僵局吗? 您认为在抛出异常时取消所有已安排但尚未运行的CompletableFutures的最佳方法是什么? 正在考虑迭代任务并在每个CompletableFuture上调用cancel.但我不喜欢这个是从连接抛出CancellationException. 解决方法
你应该记住这一点
CompletableFuture<?> f = CompletableFuture.runAsync(runnable,executionService); 基本上相当于 CompletableFuture<?> f = new CompletableFuture<>(); executionService.execute(() -> { if(!f.isDone()) { try { runnable.run(); f.complete(null); } catch(Throwable t) { f.completeExceptionally(t); } } }); 因此,ExecutorService对CompletableFuture一无所知,因此,它通常无法取消它.它所拥有的只是一些工作,表现为Runnable的一个实现. 换句话说,shutdownNow()将阻止执行挂起的作业,剩余的期货将无法正常完成,但不会取消它们.然后,你对allOf返回的未来调用join(),由于未完成的期货,它将永远不会返回. 但请注意,预定的工作确实在做任何昂贵的事情之前检查未来是否已经完成. 因此,如果您将代码更改为 ExecutorService executionService = Executors.newFixedThreadPool(5); Set<CompletableFuture<?>> tasks = ConcurrentHashMap.newKeySet(); AtomicBoolean canceled = new AtomicBoolean(); for(int i = 0; i < 1000; i++) { final int id = i; CompletableFuture<?> c = CompletableFuture .runAsync(() -> { System.out.println("Running: " + id); if(id == 400) throw new RuntimeException("Exception from: " + id); },executionService); c.whenComplete((v,ex) -> { if(ex != null && canceled.compareAndSet(false,true)) { System.out.println("Canceling."); for(CompletableFuture<?> f: tasks) f.cancel(false); System.out.println("Canceled."); } }); tasks.add(c); if(canceled.get()) { c.cancel(false); break; } } try { CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join(); } catch(Exception e) { System.out.println("Got async exception: " + e); } finally { System.out.println("DONE"); } executionService.shutdown(); 一旦相关的未来被取消,runnable将不会被执行.由于取消和普通执行之间存在竞争,因此将操作更改为可能会有所帮助 .runAsync(() -> { System.out.println("Running: " + id); if(id == 400) throw new RuntimeException("Exception from: " + id); LockSupport.parkNanos(1000); },executionService); 模拟一些实际工作量.然后,您将看到在遇到异常后执行的操作较少. 由于异步异常甚至可能在提交循环仍在运行时发生,因此它使用AtomicBoolean来检测这种情况并在这种情况下停止循环. 请注意,对于CompletableFuture,取消和任何其他特殊完成之间没有区别.调用f.cancel(…)等同于f.completeExceptionally(new CancellationException()).因此,由于CompletableFuture.allOf在异常情况下报告任何异常,因此很可能是CancellationException而不是触发异常. 如果用完全(null)替换两个cancel(false)调用,则会得到类似的效果,runnables将不会对已经完成的期货执行,但allOf将报告原始异常,因为它是唯一的例外.它还有另一个积极的作用:使用null值完成比构造CancellationException(对于每个未来的未来)便宜得多,因此通过complete(null)强制完成的运行速度要快得多,从而阻止了更多的未来执行. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |