出于倾斜的目的,我试图在Java中实现自己的线程池。以下是我实现的内容。我对此实现有几个疑问:

  • 虽然我使用的是内置Java的BlockingQueue,但Executor希望我们提供Runnable对象(通过execute方法)。但就我而言,我觉得我可以创建任何对象而不是Runnable。那么,为什么Java执行者期望Runnable,我尝试研究源代码,但还没有弄清楚。
  • 这个原始实现还有其他问题吗?

  • 请找到代码。

    公共(public)类CustomThreadPool {
    private final BlockingQueue<Runnable> blockingQueue;
    private final Worker[] workers;
    
    public CustomThreadPool(final int numOfThreads) {
        blockingQueue = new LinkedBlockingQueue<>();
        workers = new Worker[numOfThreads];
    
        for (int i = 0; i < numOfThreads; i++) {
            workers[i] = new Worker();
            workers[i].start();
        }
    }
    
    public void execute(final Runnable task) {
        blockingQueue.add(task);
    }
    
    public void shutdownImmediately() {
        for (int i = 0; i < workers.length; i++) {
            workers[i].shutdownSignal = true;
            workers[i] = null;
        }
    }
    
     private class Worker extends Thread {
        private Runnable taskToPerform = null;
        boolean shutdownSignal = false;
    
        @Override
        public void run() {
            while(true && !shutdownSignal) {
                taskToPerform = blockingQueue.poll();
                if (taskToPerform != null) {
                    taskToPerform.run();
                }
                if(shutdownSignal) {
                    break;
                }
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        final CustomThreadPool threadPool = new CustomThreadPool(5);
        for (int i = 0; i < 20; i++) {
            threadPool.execute(() -> System.out.println(Thread.currentThread().getName()));
        }
        Thread.sleep(1*1000);
        threadPool.shutdownImmediately();
    }
    

    }

    最佳答案

  • Executor希望使用Runnable或Callable,因为它在运行您提交的任务时将调用这些接口(interface)的runcall方法。
  • 在您的实现中,您不会使用BlockingQueue的阻塞方面。当队列中没有任务时,线程池线程将在while(true && !shutdownSignal)循环上持续旋转(占用cpu时间)。因为poll()方法没有阻塞。这不是实现线程池时想要的。

  • 您应该使用一种阻止方法,而不是poll()

    您可以使用poll(long timeout,TimeUnit unit)方法来获取超时参数。在这种情况下,如果您在任何线程池线程等待此调用的同时调用shutdownImmediately方法。他们将等待超时时间,民意测验将向他们返回null。他们将看到设置了shutdownSignal并退出了循环。如果不想让他们等待超时,也可以调用中断方法。
     // in run method
     try {
         taskToPerform = blockingQueue.poll(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
         break; // this thread is interrupted. Time to leave this loop
     }
    
    // in shutdownImmediately method
    workers[i].shutdownSignal = true;
    // If You don't call interrupt, Threads will wait at max 5 sec for this case.
    workers[i].interrupt();
    workers[i] = null;
    

    或者,您可以使用take()方法在队列中没有任何内容时阻止。但是在这种情况下,当您调用shutdownImmediately方法时,您必须中断可能正在等待take方法调用的线程。否则,它们将卡在take()调用中。
    // in run method
    try {
       taskToPerform = blockingQueue.take();
    } catch (InterruptedException e) {
       break; // this thread is interrupted. Time to leave this loop
    }
    
    // in shutdownImmediately method
    workers[i].shutdownSignal = true;
    workers[i].interrupt(); // this is crucial for this case
    workers[i] = null;
    

    10-02 05:08