这是一个Java并发问题。需要完成10个工作,每个工作都有32个工作线程。工作线程将增加一个计数器。一旦计数器为32,则表示此工作已完成,然后清除计数器映射。从控制台输出中,我预计将输出10个“完成”,池大小为0,counterThread大小为0。

问题是:


大多数情况下,“池大小:0和countThreadMap大小:3”将是
打印出来。即使所有线程都消失了,但3个作业却没有
完成了。
一段时间,我可以在第27行看到nullpointerexception。我使用了ConcurrentHashMap和AtomicLong,为什么仍然有并发
例外。


谢谢

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

public class Test {
    final ConcurrentHashMap<Long, AtomicLong[]> countThreadMap = new ConcurrentHashMap<Long, AtomicLong[]>();
    final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    final ThreadPoolExecutor tPoolExecutor = ((ThreadPoolExecutor) cachedThreadPool);

    public void doJob(final Long batchIterationTime) {
        for (int i = 0; i < 32; i++) {
            Thread workerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (countThreadMap.get(batchIterationTime) == null) {
                        AtomicLong[] atomicThreadCountArr = new AtomicLong[2];
                        atomicThreadCountArr[0] = new AtomicLong(1);
                        atomicThreadCountArr[1] = new AtomicLong(System.currentTimeMillis()); //start up time
                        countThreadMap.put(batchIterationTime, atomicThreadCountArr);
                    } else {
                        AtomicLong[] atomicThreadCountArr = countThreadMap.get(batchIterationTime);
                        atomicThreadCountArr[0].getAndAdd(1);
                        countThreadMap.put(batchIterationTime, atomicThreadCountArr);
                    }

                    if (countThreadMap.get(batchIterationTime)[0].get() == 32) {
                        System.out.println("done");
                        countThreadMap.remove(batchIterationTime);
                    }
                }
            });
            tPoolExecutor.execute(workerThread);
        }
    }

    public void report(){
        while(tPoolExecutor.getActiveCount() != 0){
            //
        }
        System.out.println("pool size: "+ tPoolExecutor.getActiveCount() + " and countThreadMap size:"+countThreadMap.size());
    }

    public static void main(String[] args) throws Exception {
        Test test = new Test();
        for (int i = 0; i < 10; i++) {
            Long batchIterationTime = System.currentTimeMillis();
            test.doJob(batchIterationTime);
        }

        test.report();
        System.out.println("All Jobs are done");

    }
}

最佳答案

让我们深入研究与线程相关的编程的所有错误,一个人可以做到:

Thread workerThread = new Thread(new Runnable() {
…
tPoolExecutor.execute(workerThread);


您创建了一个Thread,但没有启动它,而是将其提交给执行者。 Java API的历史性错误是让Thread在没有充分理由的情况下实现Runnable。现在,每个开发人员都应该意识到,没有理由将Thread视为Runnable。如果您不想手动start线程,请不要创建Thread。只需创建Runnable并将其传递给executesubmit

我要强调后者,因为它返回一个Future,它免费为您提供您要实现的内容:任务完成时的信息。使用invokeAll会更加轻松,它会提交一堆Callable并在完成后返回。由于您没有告诉我们有关您实际任务的任何信息,因此您不清楚是否可以让您的任务简单地实现Callable(可能返回null)而不是Runnable

如果您不能使用Callable或不想立即等待提交,则必须记住返回的Future并在以后查询它们:

static final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

public static List<Future<?>> doJob(final Long batchIterationTime) {
    final Random r=new Random();
    List<Future<?>> list=new ArrayList<>(32);
    for (int i = 0; i < 32; i++) {
        Runnable job=new Runnable() {
            public void run() {
                // pretend to do something
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(r.nextInt(10)));
            }
        };
        list.add(cachedThreadPool.submit(job));
    }
    return list;
}

public static void main(String[] args) throws Exception {
    Test test = new Test();
    Map<Long,List<Future<?>>> map=new HashMap<>();
    for (int i = 0; i < 10; i++) {
        Long batchIterationTime = System.currentTimeMillis();
        while(map.containsKey(batchIterationTime))
            batchIterationTime++;
        map.put(batchIterationTime,doJob(batchIterationTime));
    }
    // print some statistics, if you really need
    int overAllDone=0, overallPending=0;
    for(Map.Entry<Long,List<Future<?>>> e: map.entrySet()) {
        int done=0, pending=0;
        for(Future<?> f: e.getValue()) {
            if(f.isDone()) done++;
            else  pending++;
        }
        System.out.println(e.getKey()+"\t"+done+" done, "+pending+" pending");
        overAllDone+=done;
        overallPending+=pending;
    }
    System.out.println("Total\t"+overAllDone+" done, "+overallPending+" pending");
    // wait for the completion of all jobs
    for(List<Future<?>> l: map.values())
        for(Future<?> f: l)
            f.get();
    System.out.println("All Jobs are done");
}


但是请注意,如果您不需要ExecutorService来执行后续任务,则等待所有作业完成要容易得多:

cachedThreadPool.shutdown();
cachedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
System.out.println("All Jobs are done");




但是,不管手动跟踪工作状态有多么不必要,让我们深入研究一下您的尝试,以便将来避免出现以下错误:

if (countThreadMap.get(batchIterationTime) == null) {


ConcurrentMap是线程安全的,但是不会将您的并发代码转换为顺序代码(这会使多线程无效)。上面的行可能同时由多达32个线程处理,所有这些发现都发现键尚不存在,因此可能会有多个线程将初始值放入映射中。

                    AtomicLong[] atomicThreadCountArr = new AtomicLong[2];
                    atomicThreadCountArr[0] = new AtomicLong(1);
                    atomicThreadCountArr[1] = new AtomicLong(System.currentTimeMillis());
                    countThreadMap.put(batchIterationTime, atomicThreadCountArr);


因此,这就是所谓的“先检查后行动”反模式。如果有多个线程要处理该代码,它们都会put其新值,并确信这是正确的做法,因为他们在执行操作之前已检查了初始条件,但对于除一个线程外的所有线程,条件在更改时均已更改动作,它们将覆盖先前的put操作的值。

                } else {
                    AtomicLong[] atomicThreadCountArr = countThreadMap.get(batchIterationTime);
                    atomicThreadCountArr[0].getAndAdd(1);
                    countThreadMap.put(batchIterationTime, atomicThreadCountArr);


由于您正在修改已经存储在映射中的AtomicInteger,因此put操作是无用的,它将把它之前检索到的数组放在原来的位置。如果没有错误,如上所述可以有多个初始值,则put操作无效。

                }

                if (countThreadMap.get(batchIterationTime)[0].get() == 32) {


同样,使用ConcurrentMap不会将多线程代码转换为顺序代码。很明显,只有最后一个线程会将原子整数更新为32(当初始竞争条件没有实现时),但不能保证所有其他线程都已经通过了此if语句。因此,在这个执行点,仍然可以有多个线程,最多可以看到所有线程,并且可以看到32的值。要么…

                    System.out.println("done");
                    countThreadMap.remove(batchIterationTime);


看到32值的线程之一可能会执行此remove操作。此时,可能仍有线程没有执行上面的if语句,现在看不到值32而是产生了NullPointerException,因为映射中不再包含应该包含AtomicInteger的数组。这是偶然发生的情况…

10-07 18:56
查看更多