首先,为了防止那些不喜欢阅读到底的人重复标记问题,我已经阅读了Producer-Consumer Logging service with Unreliable way to shutdown问题。但是它并没有完全回答问题并且答案与书本相矛盾。
在书中提供以下代码:
public class LogWriter {
private final BlockingQueue<String> queue;
private final LoggerThread logger;
private static final int CAPACITY = 1000;
public LogWriter(Writer writer) {
this.queue = new LinkedBlockingQueue<String>(CAPACITY);
this.logger = new LoggerThread(writer);
}
public void start() {
logger.start();
}
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
private class LoggerThread extends Thread {
private final PrintWriter writer;
public LoggerThread(Writer writer) {
this.writer = new PrintWriter(writer, true); // autoflush
}
public void run() {
try {
while (true)
writer.println(queue.take());
} catch (InterruptedException ignored) {
} finally {
writer.close();
}
}
}
}
现在我们应该了解如何停止此过程。我们应该停止记录,但不应跳过已提交的消息。
作者研究方法:
public void log(String msg) throws InterruptedException {
if(!shutdownRequested)
queue.put(msg);
else
throw new IllegalArgumentException("logger is shut down");
}
并像这样评论它:
这句话对我来说足够难。
我明白那个
if(!shutdownRequested)
queue.put(msg);
不是原子的,可以在关闭后将消息添加到队列中。是的,它不是很准确,但是我看不到问题。队列将被耗尽,当队列为空时,我们可以停止LoggerThread。尤其是,我不明白为什么生产者可以被阻止。
作者没有提供完整的代码,因此我无法理解所有细节。我相信这本书是社区多数成员所阅读的,并且此示例有详细的解释。
请以完整的代码示例进行说明。
最佳答案
首先要了解的是,当请求关闭时,生产者需要停止接受更多的请求,而使用者(在这种情况下为LoggerThread
)需要排空队列。您在问题中提供的代码仅说明了故事的一个方面。当shutdownRequested
为true
时,生产者拒绝任何进一步的请求。在这个例子之后,作者继续说:
首先,问题中显示的queue.take
中的LoggerThread
将无限阻塞,以使新消息在队列中可用。但是,如果我们要(优雅地)关闭LoggerThread
,我们需要确保LoggerThread
为true时,shutdownRequested
中的关闭代码有机会执行,而不是无限地被queue.take
阻止。
当作者说消费者可以耗尽队列时,他的意思是LogWritter
可以检查shutdownRequested
,如果为true,则可以调用nonblocking drainTo方法来将队列中的当前内容清空到另一个集合中而不是调用queue.take
(或调用类似的非阻塞方法)。或者,如果shutdownRequested
为false,则LogWriter
可以照常继续调用queue.take
。
这种方法的真正问题是log
方法(由生产者调用)的实现方式。由于它不是原子的,因此多个线程有可能会错过shutdownRequested
设置为true的情况。如果错过此更新的线程数大于CAPACITY
的queue
,将会发生什么情况。让我们再次看看log
方法。 (为便于说明,添加了花括号):
public void log(String msg) throws InterruptedException {
if(!shutdownRequested) {//A. 1001 threads see shutdownRequested as false and pass the if condition.
//B. At this point, shutdownRequested is set to true by client code
//C. Meanwhile, the LoggerThread which is the consumer sees that shutdownRequested is true and calls
//queue.drainTo to drain all existing messages in the queue instead of `queue.take`.
//D. Producers insert the new message into the queue.
queue.put(msg);//Step E
} else
throw new IllegalArgumentException("logger is shut down");
}
}
如步骤E 中所示,当
put
完成耗尽队列并退出w时,多个生产者线程可以调用LoggerThread
。在第1000个线程调用put
之前,应该没有任何问题。真正的问题是第1001个线程何时调用put
。由于队列容量仅为1000,并且LoggerThread
可能不再处于 Activity 状态或已订阅queue.take
方法,因此它将阻塞。