ExecutorCompletionservice

ExecutorCompletionservice

我在ExecutorCompletionService中遇到奇怪的行为。该项目被添加到ExecutorCompletionService.submit()罚款。然后,它开始工作,并由先前提交的Callable worker线程返回。在那之后返回ExecutorCompletionService.take()再也看不到了,所以再也看不到返回更多项目的障碍了吗?我真的不确定发生了什么。我已经创建了打印行,并且可以看到它完成了Callable worker线程。一旦发生这种情况,ExecutorCompletionService.take应该准备就绪,但是在某些情况下事情会锁定,有时还好吗?

我创建了一个测试用例,如果您多次运行它,您会看到它在某些情况下会锁定并且永远不会占用任何已完成的线程

线程死锁演示

import java.util.Observable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadDeadlockDemo extends Observable implements Runnable  {

private CompletionService<String> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private int numThreadsInPool;
private BlockingQueue<String> queue;
public ThreadDeadlockDemo(int numThreads)
{
    numThreadsInPool = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    pool = new ExecutorCompletionService<String>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    responseWorkerThread.start();
    queue = new LinkedBlockingQueue<String>();
    new Thread(this).start();
}

public ThreadDeadlockDemo()
{
    numThreadsInPool = 1;
    executor = Executors.newFixedThreadPool(1);
    pool = new ExecutorCompletionService<String>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    responseWorkerThread.start();
    queue = new LinkedBlockingQueue<String>();
    new Thread(this).start();
}

public void setThreadCount(int numThreads)
{
    executor = Executors.newFixedThreadPool(numThreads);
    pool = new ExecutorCompletionService<String>(executor);
    numThreadsInPool = numThreads;
}

public void add(String info) {
    queue.add(info);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    while(shouldRun)
    {
        try {
            String info = queue.take();
            Callable<String> worker = new WorkerThread(info);
            System.out.println("submitting to pooler: " + info);
            pool.submit(worker);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

/**
 * Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
 * are complete it will send them to server for completion.
 * @author Steve
 *
 */
class HttpSchedulerWorker  implements Runnable{

    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            String vulnInfo = null;
            try {
                Future<String>  tmp = pool.take();
            //  Future<VulnInfo> tmp = pool.poll();
                if(tmp != null)
                    vulnInfo = tmp.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(vulnInfo != null)
            {
                System.out.println("info was taken from pool completed: "  + vulnInfo);
            }



        }
    }

}


}

WorkerClass:这是添加到执行程序池中并返回的线程工作程序,但在某些情况下,永远不会在ThreadlockDemos ExecutorCompletionService池中得到通知?

import java.util.concurrent.Callable;

public class WorkerThread implements Callable<String>{


String info;
WorkerThread(String info)
{
    this.info = info;
}

//@Override
public String call() throws Exception {
    System.out.println("sending vuln info: " + info);
    return info;
}


}


这是我的测试课程,只是将项目添加到队列中。这是从我的控制台打印出来的,看起来好像已经失败了。它添加到队列上就可以对其工作并返回值。但是take()从未被称为任何想法,为什么?它有时起作用,有时使我很难看清问题所在。我很想说它在Java中的错误,但我环顾四周,发现这些类没有任何问题吗?

 public class HttpSchedulerThreadedUnitTest {

ThreadDeadlockDemo scheduler;
public HttpSchedulerThreadedUnitTest(){

    setupScheduler();
    for(int i=0; i < 5;i++)
    {
        scheduler.add(i+"");
    }
}

private void setupScheduler()
{
    scheduler = new ThreadDeadlockDemo();
    scheduler.setThreadCount(1);
}

public static void main(String[] args)
{
    new HttpSchedulerThreadedUnitTest();
}


}

控制台打印:这是当WorkerThread完成时它从不从池中取出来运行
提交给池管理者:0
提交给池管理器:1
提交给池管理器:2
发送外伤信息:0
提交给池管理者:3
发送外阴信息:1
提交给池管理者:4
发送外阴信息:2
发送外阴信息:3
发送外阴信息:4

控制台打印:它实际上是在从池返回中获取项目!
提交给池管理者:0
提交给池管理器:1
提交给池管理器:2
提交给池管理者:3
提交给池管理者:4
发送外伤信息:0
信息是从池中获取的:0
发送外阴信息:1
信息是从完成的池中获取的:1
发送外阴信息:2
信息是从池中获取的:2
发送外阴信息:3
信息是从游泳池中获取的:3
发送外阴信息:4
信息是从完成的池中获取的:4

最佳答案

这是很多代码。如果您可以将其削减(删除与HTTP相关的部分等),那就太好了。我也不确定After that return the ExecutorCompletionService.take never sees it so never sees the blocking to return anymore items?是什么意思

您可以在锁定时进行线程转储,并查看哪个线程在代码的哪个点被锁定。

同时,我确实看到了一些看起来错误的代码。

while(requestQueue.isEmpty()){
            try {
                synchronized(this)
                {
                    wait();
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            }


在这里,您正在对可运行对象进行同步。这几乎总是错误的,因为可运行对象通常不会被多个线程访问。另外,您正在测试同步语句之外的条件。通常,您使用wait如下:

synchronized(lock){
    while(!condition){
        wait();
    }
}


但是,我看不到任何在可运行对象上调用通知的代码。这可能会导致程序挂起。基本上,您正在等待某些东西,但是没有人在唤醒您,因此您可以无限期地等待。通过查看发生这种情况时的线程转储,可以很容易地确定是否是导致您遇到问题的原因。

如果您使用队列,则最好的建议是将阻塞队列用于请求队列。这样,您就不必完全等待/通知业务。

10-04 10:06