Java:停止可调用线程的设计思路
我正在编写一个程序来进行一些批处理.批处理元素可以彼此独立地处理,我们希望最小化整体处理时间.因此,我不是一次循环遍历批处理中的每个元素,而是使用ExecutorService并向其提交Callable对象:
public void process(Batch batch) { ExecutorService execService = Executors.newCachedThreadPool(); CopyOnWriteArrayList<Future<BatchElementStatus>> futures = new CopyOnWriteArrayList<Future<BatchElementStatus>>(); for (BatchElement element : batch.getElement()) { Future<MtaMigrationStatus> future = execService.submit(new ElementProcessor(batch.getID(),element)); futures.add(future); } boolean done = false; while (!done) { for (Future<BatchElementStatus> future : futures) { try { if (future.isDone()) { futures.remove(future); } } catch (Exception e) { System.out.println(e.getMessage()); } if (futures.size() == 0) { done = true; } } } } 我们希望能够取消批处理.因为我没有使用循环,所以如果设置了取消标志,我不能只检查每个循环的顶部. 我们正在使用一个JMS主题,BatchProcessor和ElementProcessor将监听它,告知它们批处理已被取消. ElementProcess call()中有许多步骤,之后可以安全地停止其中一些处理,但是有一个不可返回的点.该课程有这个基本设计: public class ElementProcessor implements Callable,MessageListener { private cancelled = false; public void onMessage(Message msg) { // get message object cancelled = true; } public BatchElementStatus call() { String status = SUCCESS; if (!cancelled) { doSomehingOne(); } else { doRollback(); status = CANCELLED; } if (!cancelled) { doSomehingTwo(); } else { doRollback(); status = CANCELLED; } if (!cancelled) { doSomehingThree(); } else { doRollback(); status = CANCELLED; } if (!cancelled) { doSomehingFour(); } else { doRollback(); status = CANCELLED; } // After this point,we cannot cancel or pause the processing doSomehingFive(); doSomehingSix(); return new BatchElementStatus("SUCCESS"); } } 我想知道是否有更好的方法来检查批处理/元素是否已被取消,而不是在if(!cancel)语句的调用方法中包装方法调用/代码块. 有什么建议? 解决方法
我认为你不能比你现在做的更好,但这里有一个替代方案:
public BatchElementStatus call() { return callMethod(1); } private callMethod(int methodCounter) { if (cancelled) { doRollback(); return new BatchElementStatus("FAIL"); } switch (methodCounter) { case 1 : doSomethingOne(); break; case 2 : doSomethingTwo(); break; ... case 5 : doSomethingFive(); doSomethingSix(); return new BatchElementStatus("SUCCESS"); } return callMethod(methodCounter + 1); } 此外,您希望取消volatile,因为onMessage将从另一个线程调用.但是你可能不想使用onMessage并取消(见下文). 其他要点:1)CopyOnWriteArrayList< Future< BatchElementStatus>>期货应该只是一个ArrayList.使用并发集合误导我们认为期货在很多线程上. 2)while(!done)应该用while(!futures.isEmpty())替换,并完成删除. 3)你可能应该只调用future.cancel(true)而不是“messaging”取消.然后你必须检查(Thread.interrupted())而不是if(取消).如果你想杀死所有的未来,那么只需调用execService.shutdownNow();您的任务必须处理中断才能使其正常工作. 编辑: 你应该使用ExecutorCompletionService而不是你的while(!done){for(… futures){…}}.它会做你想要做的事情,它可能做得更好. API中有一个完整的示例. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |