问题描述
我有以下代码片段,它基本上扫描了需要执行的任务列表,然后将每个任务交给执行程序执行.
I have the following code snippet that basically scans through the list of task that needs to be executed and each task is then given to the executor for execution.
JobExecutor
依次创建另一个执行器(用于执行数据库操作...读取和写入数据到队列)并完成任务.
The JobExecutor
in turn creates another executor (for doing db stuff...reading and writing data to queue) and completes the task.
JobExecutor
为提交的任务返回一个 Future
.当其中一项任务失败时,我想通过捕获所有异常来优雅地中断所有线程并关闭执行程序.我需要做哪些改变?
JobExecutor
returns a Future<Boolean>
for the tasks submitted. When one of the task fails, I want to gracefully interrupt all the threads and shutdown the executor by catching all the exceptions. What changes do I need to do?
public class DataMovingClass {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();
ThreadPoolExecutor threadPoolExecutor = null ;
private List<Source> sources = new ArrayList<Source>();
private static class IDGenerator extends ThreadLocal<Integer> {
@Override
public Integer get() {
return uniqueId.incrementAndGet();
}
}
public void init(){
// load sources list
}
public boolean execute() {
boolean succcess = true ;
threadPoolExecutor = new ThreadPoolExecutor(10,10,
10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("DataMigration-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();
for (Source source : sources) {
result.add(threadPoolExecutor.submit(new JobExecutor(source)));
}
for (Future<Boolean> jobDone : result) {
try {
if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
// in case of successful DbWriterClass, we don't need to change
// it.
success = false;
}
} catch (Exception ex) {
// handle exceptions
}
}
}
public class JobExecutor implements Callable<Boolean> {
private ThreadPoolExecutor threadPoolExecutor ;
Source jobSource ;
public SourceJobExecutor(Source source) {
this.jobSource = source;
threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Job Executor-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
}
public Boolean call() throws Exception {
boolean status = true ;
System.out.println("Starting Job = " + jobSource.getName());
try {
// do the specified task ;
}catch (InterruptedException intrEx) {
logger.warn("InterruptedException", intrEx);
status = false ;
} catch(Exception e) {
logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
status = false ;
}
System.out.println("Ending Job = " + jobSource.getName());
return status ;
}
}
}
推荐答案
当你向执行器提交任务时,它会返回一个 FutureTask
实例.
When you submit a task to the executor, it returns you a FutureTask
instance.
FutureTask.get()
会将任务抛出的任何异常作为 ExecutorException
重新抛出.
因此,当您遍历 List
并对每个调用 get 时,捕获 ExecutorException
并调用有序关闭.
So when you iterate through the List<Future>
and call get on each, catch ExecutorException
and invoke an orderly shutdown.
这篇关于处理 ThreadPoolExecutor 的异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!