我有一个 ThreadPoolExecutor,当我调用 getActiveCount() 时它似乎在骗我。然而,我没有做过很多多线程编程,所以也许我做错了什么。

这是我的 TPE

@Override
public void afterPropertiesSet() throws Exception {

    BlockingQueue<Runnable> workQueue;
    int maxQueueLength = threadPoolConfiguration.getMaximumQueueLength();
    if (maxQueueLength == 0) {
        workQueue = new LinkedBlockingQueue<Runnable>();
    } else {
        workQueue = new LinkedBlockingQueue<Runnable>(maxQueueLength);
    }

    pool = new ThreadPoolExecutor(
                   threadPoolConfiguration.getCorePoolSize(),
                   threadPoolConfiguration.getMaximumPoolSize(),
                   threadPoolConfiguration.getKeepAliveTime(),
                   TimeUnit.valueOf(threadPoolConfiguration.getTimeUnit()),
                   workQueue,
                   // Default thread factory creates normal-priority,
                   // non-daemon threads.
                   Executors.defaultThreadFactory(),
                   // Run any rejected task directly in the calling thread.
                   // In this way no records will be lost due to rejection
                   // however, no records will be added to the workQueue
                   // while the calling thread is processing a Task, so set
                   // your queue-size appropriately.
                   //
                   // This also means MaxThreadCount+1 tasks may run
                   // concurrently. If you REALLY want a max of MaxThreadCount
                   // threads don't use this.
                   new ThreadPoolExecutor.CallerRunsPolicy());
}

在这个类中,我还有一个 DAO,我将它传递给我的 Runnable ( FooWorker ),如下所示:
@Override
public void addTask(FooRecord record) {
    if (pool == null) {
        throw new FooException(ERROR_THREAD_POOL_CONFIGURATION_NOT_SET);
    }
    pool.execute(new FooWorker(context, calculator, dao, record));
}
FooWorker 通过 record 通过状态机运行 calculator(唯一的非单例),然后通过 dao 将转换发送到数据库,如下所示:
public void run() {
    calculator.calculate(record);
    dao.save(record);
}

一旦我的主线程完成创建新任务,我会尝试等待以确保所有线程成功完成:
while (pool.getActiveCount() > 0) {
    recordHandler.awaitTermination(terminationTimeout,
                                   terminationTimeoutUnit);
}

我从输出日志中看到的(由于线程可能不可靠)是 getActiveCount() 过早地返回零,并且 while() 循环正在退出,而我的最后一个线程仍在打印 calculator 的输出。

注意我也试过调用 pool.shutdown() 然后使用 awaitTermination 但是下次我的工作运行时池仍然关闭。

我唯一的猜测是,在线程内部,当我将数据发送到 dao 时(因为它是 Spring 在主线程中创建的单例......),java 正在考虑线程处于非 Activity 状态,因为(我假设)它正在处理/等待主线程。

直觉上,仅基于我所看到的,这是我的猜测。但是……真的是这样吗?有没有办法“做对”而不在 run() 的顶部放置一个手动递增的变量,并在末尾递减以跟踪线程数?

如果答案是“不要传入 dao”,那么我是不是必须为每个线程“新建”一个 DAO?我的过程已经是一个(美丽的、高效的)野兽,但这真的很糟糕。

最佳答案

作为 the JavaDoc of getActiveCount states ,它是一个近似值:您不应以此为基础做出任何主要的业务逻辑决策。

如果你想等待所有计划任务完成,那么你应该简单地使用

pool.shutdown();
pool.awaitTermination(terminationTimeout, terminationTimeoutUnit);

如果你需要等待一个特定的任务完成,你应该使用 submit() 而不是 execute() 然后检查 Future 对象是否完成(如果你想非阻塞地使用 isDone() 或者简单地调用 get() ,它会阻塞直到任务已经完成了)。

关于java - ThreadPoolExecutor 的 getActiveCount(),我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/7271541/

10-11 02:21