我想实现一个线程池,通过覆盖afterExecute
挂钩可以在特定时间内执行任务。我可以再次提交参数Runnable r
吗?
这是我最初的实现。
public class RetriableThreadPool extends ThreadPoolExecutor {
static final int MAXRETRYTIMES = 5;
int retryTimes = 0;
public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
retryTimes = 0;
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (retryTimes < MAXRETRYTIMES) {
retryTimes++;
super.submit(r);
}
}
}
在此初始实施中,我只允许提交一个任务。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
public static void main(String[] args) {
RetriableThreadPool retriableThreadPool = new RetriableThreadPool(10, 10, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
retriableThreadPool.execute(new Runnable() {
int num = 0;
@Override
public void run() {
// TODO Auto-generated method stub
num = num + 123;
System.out.println(num);
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// retriableThreadPool.shutdown();
}
}
在此示例中,我得到了奇怪的输出:
123
246
如果可以重新提交runnable,我想我应该得到5个输出。如果无法重新提交。结果应该只有123。我不明白此输出的原因。
我感谢nogard修改了代码
public class RetriableThreadPool extends ThreadPoolExecutor {
static final int MAXRETRYTIMES = 5;
int retryTimes = 0;
public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
retryTimes = 0;
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (retryTimes < MAXRETRYTIMES) {
retryTimes++;
super.execute(r);
}
}
}
我还有其他3个问题:
如何以原始状态重试可运行对象。在这种情况下,我希望结果是123的5倍
如何为方法
submit
添加挂钩,就像afterExecute
为execute
添加钩子一样已经有可重用线程池的良好实现了吗?抛出异常或可调用返回某些结果时,我想重试runnable。
最佳答案
我认为这种行为的原因是您在afterExecute方法中提交了任务,而不是执行,并且提交不会再次触发afterExecute
回调。这就是为什么您在输出中仅看到两行的原因:第一行来自原始execute,第二行来自Submit。
而且,您永远不会增加重试计数器,您的任务将始终被重新提交
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
++ retryTimes;
if (retryTimes < MAXRETRYTIMES) {
super.execute(r);
}
}
更新您的3个问题:
有多种选择:
不要更改Runnable内部的状态(不要分配给num)
创建Runnable的新实例(或复制实例)
重置可运行状态
对于挂钩,我将使用Decorator模式实现:
public class YourExecutor {
@Override
public void submit(Runnable task) {
return super.submit(new TaskDecorator(task));
}
protected void onCompletedTask(Runnable task) {
// callback
}
private class TaskDecorator implements Runnable {
private final Runnable delegate;
public TaskDecorator(Runnable delegate) {
this.delegate = delegate;
}
@Override
public void run() {
this.delegate.run();
onCompletedTask(delegate);
}
}