我是我的代码,我向ExecutorService提交了一些任务,然后使用shutdown()和awaitTermination()等待它们完成。但是,如果任何一项任务花费的时间超过某个时间段,我希望取消它而不影响其他任务。我使用来自ExecutorService that interrupts tasks after a timeout的代码修改后的代码,如下所示:
package com.jthink.jaikoz.memory;
import com.jthink.jaikoz.MainWindow;
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private boolean isShutdown = false;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
//Map Task to the Timeout Task that could be used to interrupt it
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public long getTimeout()
{
return timeout;
}
public TimeUnit getTimeoutUnit()
{
return timeoutUnit;
}
public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
{
super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
isShutdown = true;
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
//Schedule a task to interrupt the thread that is running the task after time timeout
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
//Add Mapping
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//Remove mapping and cancel timeout task
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
if (isShutdown)
{
if(getQueue().isEmpty())
{
//Queue is empty so all tasks either finished or currently running
MainWindow.logger.severe("---Thread Pool Queue is Empty");
//timeoutExecutor.shutdownNow();
}
}
}
/**
* Interrupt the thread
*
*/
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
MainWindow.logger.severe("Cancelling task because taking too long");
thread.interrupt();
}
}
}
一个测试用例,用于说明何时有时间完成任务以及何时它们都不工作
package com.jthink.jaikoz;
import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by Paul on 08/12/2014.
*/
public class TestThreadPool extends TestCase
{
public void testThreadPoolTasksComplete() throws Exception
{
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++)
{
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Done");
return null;
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
}
public void testThreadPoolTasksCancelled() throws Exception
{
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++)
{
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Done");
return null;
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
}
}
在我的代码中似乎起作用:
private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
throws JaikozException
{
if (stopTask)
{
MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
return false;
}
TimeoutThreadPoolExecutor es = getExecutorService();
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
for(MatchKey matchKey:matchKeyToSongs.keySet())
{
List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
}
es.shutdown();
try
{
es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
}
catch(InterruptedException ie)
{
MainWindow.logger.warning(this.getClass() + " has been interrupted");
return false;
}
return true;
}
但是即使是一位客户
---Thread Pool Queue is Empty
输出awaitTermination()不返回,仅在用户两小时后取消任务时最终返回-此处完整日志摘录
14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted
那么,即使日志显示队列为空,因此在执行器本身和嵌入式timeoutExecutor上都调用了shutdown(),那怎么还没有返回awaiterTermination()呢?
我本人对此有一些想法,但不知道答案。
首先,为什么实际上有必要关闭TimeOutExecutor以使awaitTermination()返回。在我的子类中,awaitTermination()不会被覆盖,因此,如果所有任务都已完成,那么TiumeOutExecutor(awaitTermination()一无所知)是否关闭有什么关系呢?
其次,为什么---线程池队列为空有时会多次获得输出
最佳答案
我在TimeoutThreadPoolExecutor
中进行了自定义修改,并且工作正常。
public static class TimeoutThreadPoolExecutor extends ThreadPoolExecutor
{
private final long timeout;
private final TimeUnit timeoutUnit;
private boolean isShutdown = false;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
isShutdown = true;
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
if (isShutdown) timeoutExecutor.shutdown();
}
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
thread.interrupt();
System.out.println("Cancelled");
}
}
}
情况1:无超时
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
6, TimeUnit.SECONDS);
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Done");
return null;
}
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
它打印:
Task done
Program done
情况2:超时
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(
100, 100, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
3, TimeUnit.SECONDS);
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Task done");
return null;
}
});
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
它打印:
Cancelled
Program done
关于java - 如何使用此自定义ExecutorService使关机正常工作?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/27207797/