我想实现一个线程池,通过覆盖afterExecute挂钩可以在特定时间内执行任务。我可以再次提交参数Runnable r吗?

这是我最初的实现。

public class RetriableThreadPool extends ThreadPoolExecutor {

  static final int MAXRETRYTIMES = 5;

  int retryTimes = 0;

  public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    retryTimes = 0;
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (retryTimes < MAXRETRYTIMES) {
      retryTimes++;
      super.submit(r);
    }
  }

}


在此初始实施中,我只允许提交一个任务。

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

  public static void main(String[] args) {
    RetriableThreadPool retriableThreadPool = new RetriableThreadPool(10, 10, 0L,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    retriableThreadPool.execute(new Runnable() {
      int num = 0;

      @Override
      public void run() {
        // TODO Auto-generated method stub
        num = num + 123;
        System.out.println(num);
      }

    });
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    // retriableThreadPool.shutdown();
  }
}


在此示例中,我得到了奇怪的输出:

123
246


如果可以重新提交runnable,我想我应该得到5个输出。如果无法重新提交。结果应该只有123。我不明白此输出的原因。



我感谢nogard修改了代码

public class RetriableThreadPool extends ThreadPoolExecutor {

  static final int MAXRETRYTIMES = 5;

  int retryTimes = 0;

  public RetriableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
      TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    retryTimes = 0;
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    if (retryTimes < MAXRETRYTIMES) {
      retryTimes++;
      super.execute(r);
    }
  }
}


我还有其他3个问题:


如何以原始状态重试可运行对象。在这种情况下,我希望结果是123的5倍
如何为方法submit添加挂钩,就像afterExecuteexecute添加钩子一样
已经有可重用线程池的良好实现了吗?抛出异常或可调用返回某些结果时,我想重试runnable。

最佳答案

我认为这种行为的原因是您在afterExecute方法中提交了任务,而不是执行,并且提交不会再次触发afterExecute回调。这就是为什么您在输出中仅看到两行的原因:第一行来自原始execute,第二行来自Submit。

而且,您永远不会增加重试计数器,您的任务将始终被重新提交

@Override
protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    ++ retryTimes;
    if (retryTimes < MAXRETRYTIMES) {
        super.execute(r);
    }
}


更新您的3个问题:


有多种选择:


不要更改Runnable内部的状态(不要分配给num)
创建Runnable的新实例(或复制实例)
重置可运行状态

对于挂钩,我将使用Decorator模式实现:

public class YourExecutor {
@Override
public void submit(Runnable task) {
    return super.submit(new TaskDecorator(task));
}

protected void onCompletedTask(Runnable task) {
    // callback
}

private class TaskDecorator implements Runnable {
    private final Runnable delegate;

    public TaskDecorator(Runnable delegate) {
        this.delegate = delegate;
    }

    @Override
    public void run() {
        this.delegate.run();
        onCompletedTask(delegate);
    }
}

08-07 07:11
查看更多