首先,为了防止那些不喜欢阅读到底的人重复标记问题,我已经阅读了Producer-Consumer Logging service with Unreliable way to shutdown问题。但是它并没有完全回答问题并且答案与书本相矛盾。

在书中提供以下代码:

public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

    public void start() {
        logger.start();
    }

    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        private final PrintWriter writer;

        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true); // autoflush
        }

        public void run() {
            try {
                while (true)
                    writer.println(queue.take());
            } catch (InterruptedException ignored) {
            } finally {
                writer.close();
            }
        }
    }
}

现在我们应该了解如何停止此过程。我们应该停止记录,但不应跳过已提交的消息。

作者研究方法:
public void log(String msg) throws InterruptedException {
     if(!shutdownRequested)
           queue.put(msg);
     else
           throw new IllegalArgumentException("logger is shut down");
 }

并像这样评论它:



这句话对我来说足够难。

我明白那个
 if(!shutdownRequested)
           queue.put(msg);

不是原子的,可以在关闭后将消息添加到队列中。是的,它不是很准确,但是我看不到问题。队列将被耗尽,当队列为空时,我们可以停止LoggerThread。尤其是,我不明白为什么生产者可以被阻止

作者没有提供完整的代码,因此我无法理解所有细节。我相信这本书是社区多数成员所阅读的,并且此示例有详细的解释。

请以完整的代码示例进行说明。

最佳答案

首先要了解的是,当请求关闭时,生产者需要停止接受更多的请求,而使用者(在这种情况下为LoggerThread)需要排空队列。您在问题中提供的代码仅说明了故事的一个方面。当shutdownRequestedtrue时,生产者拒绝任何进一步的请求。在这个例子之后,作者继续说:



首先,问题中显示的queue.take中的LoggerThread将无限阻塞,以使新消息在队列中可用。但是,如果我们要(优雅地)关闭LoggerThread,我们需要确保LoggerThread为true时,shutdownRequested中的关闭代码有机会执行,而不是无限地被queue.take阻止。

当作者说消费者可以耗尽队列时,他的意思是LogWritter可以检查shutdownRequested,如果为true,则可以调用nonblocking drainTo方法来将队列中的当前内容清空到另一个集合中而不是调用queue.take(或调用类似的非阻塞方法)。或者,如果shutdownRequested为false,则LogWriter可以照常继续调用queue.take

这种方法的真正问题是log方法(由生产者调用)的实现方式。由于它不是原子的,因此多个线程有可能会错过shutdownRequested设置为true的情况。如果错过此更新的线程数大于CAPACITYqueue,将会发生什么情况。让我们再次看看log方法。 (为便于说明,添加了花括号):

public void log(String msg) throws InterruptedException {
     if(!shutdownRequested) {//A. 1001 threads see shutdownRequested as false and pass the if condition.

           //B. At this point, shutdownRequested is set to true by client code
           //C. Meanwhile, the LoggerThread which is the consumer sees that shutdownRequested is true and calls
           //queue.drainTo to drain all existing messages in the queue instead of `queue.take`.
           //D. Producers insert the new message into the queue.
           queue.put(msg);//Step E
     } else
           throw new IllegalArgumentException("logger is shut down");
     }
}

步骤E 中所示,当put完成耗尽队列并退出w时,多个生产者线程可以调用LoggerThread。在第1000个线程调用put之前,应该没有任何问题。真正的问题是第1001个线程何时调用put。由于队列容量仅为1000,并且LoggerThread可能不再处于 Activity 状态或已订阅queue.take方法,因此它将阻塞。

10-07 18:11