Java ExecutorService invokeAll()中断
发布时间:2020-12-15 00:41:37 所属栏目:Java 来源:网络整理
导读:我有一个宽度为10的固定线程池ExecutorService,以及一个100 Callable列表,每个等待20秒并记录它们的中断. 我在一个单独的线程中调用该列表中的invokeAll,几乎立即中断该线程. ExecutorService执行按预期中断,但Callables记录的实际中断数远远超过预期10 –
我有一个宽度为10的固定线程池ExecutorService,以及一个100 Callable列表,每个等待20秒并记录它们的中断.
我在一个单独的线程中调用该列表中的invokeAll,几乎立即中断该线程. ExecutorService执行按预期中断,但Callables记录的实际中断数远远超过预期10 – 大约20-40.为什么这样,如果ExecutorService可以同时执行不超过10个线程? 完整来源:(由于并发,您可能需要运行一次) @Test public void interrupt3() throws Exception{ int callableNum = 100; int executorThreadNum = 10; final AtomicInteger interruptCounter = new AtomicInteger(0); final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum); final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>(); for (int i = 0; i < callableNum; ++i) { executeds.add(new Waiter(interruptCounter)); } Thread watcher = new Thread(new Runnable() { @Override public void run(){ try { executorService.invokeAll(executeds); } catch(InterruptedException ex) { // NOOP } } }); watcher.start(); Thread.sleep(200); watcher.interrupt(); Thread.sleep(200); assertEquals(10,interruptCounter.get()); } // This class just waits for 20 seconds,recording it's interrupts private class Waiter implements Callable <Object> { private AtomicInteger interruptCounter; public Waiter(AtomicInteger interruptCounter){ this.interruptCounter = interruptCounter; } @Override public Object call() throws Exception{ try { Thread.sleep(20000); } catch(InterruptedException ex) { interruptCounter.getAndIncrement(); } return null; } } 使用WinXP 32位,Oracle JRE 1.6.0_27和JUnit4 解决方法
我不同意你应该只收到10个中断的假设.
Assume the CPU has 1 core. 1. Main thread starts Watcher and sleeps 2. Watcher starts and adds 100 Waiters then blocks 3. Waiter 1-10 start and sleep in sequence 4. Main wakes and interrupts Watcher then sleeps 5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts) 6. Waiter 11-13 start and sleep 7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts) 8. Waiter 14-20 are "started" resulting in a no-op 9. Waiter 21-24 start and sleep .... 本质上,我的论点是,无法保证Watcher线程在允许生成时间片并允许ExecutorService的工作线程启动更多Waiter任务之前,将允许取消所有100个“Waiter”RunnableFuture实例. 更新:显示来自AbstractExecutorService的代码 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } for (Future<T> f : futures) { if (!f.isDone()) { try { f.get(); //If interrupted,this is where the InterruptedException will be thrown from } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; return futures; } finally { if (!done) for (Future<T> f : futures) f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads } } 包含f.cancel(true)的finally块是当中断传播到当前正在运行的任务时.如您所见,这是一个紧凑的循环,但不能保证执行循环的线程能够在一个时间片中遍历Future的所有实例. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |