我们有一个异步servlet,它从Jetty产生以下警告日志:

java.io.IOException: Closed while Pending/Unready

启用调试日志后,我得到以下堆栈跟踪:
WARN [jetty-25948] (HttpOutput.java:278)   - java.io.IOException: Closed while Pending/Unready
DEBUG [jetty-25948] (HttpOutput.java:279)   -
java.io.IOException: Closed while Pending/Unready
        at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:277) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:488) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:708) [jetty-util.jar:9.4.8.v20171121]
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:626) [jetty-util.jar:9.4.8.v20171121]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

它并没有太大帮助。

仅在Jetty调用我们的onTimeout()AsyncListener方法之后才发出警告。 QA有时可以通过在客户端应用程序上使用kill -9来重现它。

如何使用示例Servlet客户端代码重现此警告?我想在比我们的生产代码更简单的环境中理解此问题,以便以后可以修复生产。样本servlet应该如何表现?是否可以在同一JUnit测试中使用Apache Commons HttpClient客户端来重现它? (这对于编写集成测试而不需要像kill -9这样的复杂网络黑客来说非常有用。)

我已经尝试了一些方法来实现示例异步servlet和客户端,但均未成功。我认为附加此代码不会有太大帮助,但是如果有兴趣的人可以这样做。

jetty 版本:9.4.8.v20171121

更新(2018-06-27):

反映@Joakim Erdfelt的有用答案,我在我们的代码中未找到任何close()调用,但发现可疑的同步丢失。这是异步轮询servlet的基础:

public class QueuePollServlet extends HttpServlet {

    public QueuePollServlet() {
    }

    @Override
    protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
            throws ServletException, IOException {
        resp.setContentType(MediaType.OCTET_STREAM.type());
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.flushBuffer();
        final AsyncContext async = req.startAsync();
        async.setTimeout(30_000);
        final ServletOutputStream output = resp.getOutputStream();
        final QueueWriteListener writeListener = new QueueWriteListener(async, output);
        async.addListener(writeListener);
        output.setWriteListener(writeListener);
    }

    private static class QueueWriteListener implements AsyncListener, WriteListener {

        private final AsyncContext asyncContext;
        private final ServletOutputStream output;

        public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
            this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
            this.output = checkNotNull(output, "output cannot be null");
        }

        @Override
        public void onWritePossible() throws IOException {
            writeImpl();
        }

        private synchronized void writeImpl() throws IOException {
            while (output.isReady()) {
                final byte[] message = getNextMessage();
                if (message == null) {
                    output.flush();
                    return;
                }
                output.write(message);
            }
        }

        private void completeImpl() {
            asyncContext.complete();
        }

        public void dataArrived() {
            try {
                writeImpl();
            } catch (IOException e) {
                ...
            }
        }

        public void noMoreBuffers() {
            completeImpl();
        }

        @Override
        public void onTimeout(final AsyncEvent event) throws IOException {
            completeImpl();
        }

        @Override
        public void onError(final Throwable t) {
            logger.error("Writer.onError", t);
            completeImpl();
        }


        ...
    }
}

可能的比赛条件:
  • DataFeederThread:调用dataArrived()-> writeImpl(),然后得到output.isReady()true
  • Jetty会调用onTimeout()来完成上下文。
  • DataFeederThread:在while循环中调用output.write(),但找到了完整的上下文。

  • 这种情况可能引起Closed while Pending/Unready警告,还是其他问题?
    我认为制作completeImpl() synchronized可以解决问题,还是还有其他需要注意的地方?

    更新(2018-06-28):

    我们在onError中也有类似的QueueWriteListener实现,如下所示:

    @Override
    public void onError(final Throwable t) {
        logger.error("Writer.onError", t);
        completeImpl();
    }
    

    无论如何,在onError日志消息周围没有Closed while Pending/Unready错误日志(每条日志都在两个小时的时间范围内),只有类似我们DataFeederThread中的以下EOF:

    DEBUG [DataFeederThread] (HttpOutput.java:224) -
    org.eclipse.jetty.io.EofException: null
            at org.eclipse.jetty.server.HttpConnection$SendCallback.reset(HttpConnection.java:704) ~[jetty-server.jar:9.4.8.v20171121]
            at org.eclipse.jetty.server.HttpConnection$SendCallback.access$300(HttpConnection.java:668) ~[jetty-server.jar:9.4.8.v20171121]
            at org.eclipse.jetty.server.HttpConnection.send(HttpConnection.java:526) ~[jetty-server.jar:9.4.8.v20171121]
            at org.eclipse.jetty.server.HttpChannel.sendResponse(HttpChannel.java:778) ~[jetty-server.jar:9.4.8.v20171121]
            at org.eclipse.jetty.server.HttpChannel.write(HttpChannel.java:834) ~[jetty-server.jar:9.4.8.v20171121]
            at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:234) [jetty-server.jar:9.4.8.v20171121]
            at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:218) [jetty-server.jar:9.4.8.v20171121]
            at org.eclipse.jetty.server.HttpOutput.flush(HttpOutput.java:392) [jetty-server.jar:9.4.8.v20171121]
            at com.example.QueuePollServlet$QueueWriteListener.writeImpl()
            at com.example.QueuePollServlet$QueueWriteListener.dataArrived()
    
    
    DEBUG [DataFeederThread] (QueuePollServlet.java:217) - messageArrived exception
    org.eclipse.jetty.io.EofException: Closed
            at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:476) ~[jetty-server.jar:9.4.8.v20171121]
            at com.example.QueuePollServlet$QueueWriteListener.writeImpl()
            at com.example.QueuePollServlet$QueueWriteListener.dataArrived()
    

    最佳答案

    可以通过一些手动调试和简单的Closed while Pending/Unready客户端来重现curl警告。我已经用Jetty 9.4.8.v20171121和Jetty 9.4.11.v20180605测试了它。您需要两个断点,因此要在测试环境中可靠地重现并不容易。

  • The first breakpoint is in the HttpOutput.write() method right after it changes its state from READY to PENDING :
    case READY:
        if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
            continue;
        // put a breakpoint here
    
  • The second one in Response.closeOutput() :
    case STREAM:
        // put a breakpiont here
        if (!_out.isClosed())
            getOutputStream().close();
        break;
    

  • 重现步骤:
  • [JettyThread1]调用QueueWriteListener.onWritePossible(),该代码将几个字节写入输出,然后返回(因为其输入缓冲区为空)。
  • 等待onTimeout事件。
  • [JettyThread2] HttpChannelState.onTimeout()调用QueueWriteListener.onTimeout(),后者调用asyncContext.complete()
  • [JettyThread2] HttpChannelState.onTimeout()在异步超时后安排调度。
  • [JettyThread2]在第二个断点处暂停
  • [DFT] DataFeederThread调用writeImpl()
  • [DFT] DataFeederThread调用HttpOutput.write()(这是输出流)
  • [DFT] HttpOutput.write()将其状态从READY更改为PENDING
  • [DFT]由于
  • 上方的断点,因此DataFeederThread在此处暂停
  • [JettyThread2]调度的调度将关闭输出流并产生Closed while Pending/Unready警告。

  • 因此,实际上是Jetty关闭了此(Jetty 9.4.8.v20171121)堆栈上的输出流:
    Thread [jetty-19] (Suspended)
        Response.closeOutput() line: 1043
        HttpChannelOverHttp(HttpChannel).handle() line: 488
        HttpChannelOverHttp(HttpChannel).run() line: 293
        QueuedThreadPool.runJob(Runnable) line: 708
        QueuedThreadPool$2.run() line: 626
        Thread.run() line: 748
    

    在监听器中使onTimeout() synchronized(以及writeImpl()也是synchronized)无济于事,因为计划的关闭仍然可以与writeImpl(来自DataFeederThread)重叠。考虑这种情况:
  • [JettyThread1]调用QueueWriteListener.onWritePossible(),该代码将几个字节写入输出,然后返回(因为其输入缓冲区为空)。
  • 等待onTimeout事件。
  • [JettyThread2] HttpChannelState.onTimeout()调用QueueWriteListener.onTimeout(),后者调用asyncContext.complete()
  • [DFT] DataFeederThread调用writeImpl()(由于onTimeout尚未完成,因此已被阻止)
  • [JettyThread2] QueueWriteListener.onTimeout()完成
  • [DFT] writeImpl()可以运行
  • [JettyThread2] HttpChannelState.onTimeout()在异步超时后安排调度。
  • [JettyThread2]在第二个断点处暂停
  • [DFT] DataFeederThread调用HttpOutput.write()(这是输出流)
  • [DFT] HttpOutput.write()将其状态从READY更改为PENDING
  • [DFT]由于
  • 上方的断点,因此DataFeederThread在此处暂停
  • [JettyThread2]调度的调度将关闭输出流并产生Closed while Pending/Unready警告。

  • 不幸的是,在asnyContext.complete()之后,仅检查output.isReady()是不够的。由于Jetty重新打开true(请参见下面的堆栈),所以它返回HttpOutput,因此在侦听器中需要单独的标志。
    Thread [jetty-13] (Suspended (access of field _state in HttpOutput))
        HttpOutput.reopen() line: 195
        HttpOutput.recycle() line: 923
        Response.recycle() line: 138
        HttpChannelOverHttp(HttpChannel).recycle() line: 269
        HttpChannelOverHttp.recycle() line: 83
        HttpConnection.onCompleted() line: 424
        HttpChannelOverHttp(HttpChannel).onCompleted() line: 695
        HttpChannelOverHttp(HttpChannel).handle() line: 493
        HttpChannelOverHttp(HttpChannel).run() line: 293
        QueuedThreadPool.runJob(Runnable) line: 708
        QueuedThreadPool$2.run() line: 626
        Thread.run() line: 748
    

    此外,当输出仍处于关闭状态(循环/重新打开之前)时,isReady()也返回true。相关问题:isReady() returns true in closed state - why?

    最终实现类似于以下内容:

    @Override
    protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
            throws ServletException, IOException {
        resp.setContentType(MediaType.OCTET_STREAM.type());
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.setBufferSize(4096);
        resp.flushBuffer();
        final AsyncContext async = req.startAsync();
        async.setTimeout(5_000); // millis
        final ServletOutputStream output = resp.getOutputStream();
        final QueueWriteListener writeListener = new QueueWriteListener(async, output);
        async.addListener(writeListener);
        output.setWriteListener(writeListener);
    }
    
    private static class QueueWriteListener implements AsyncListener, WriteListener {
    
        private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);
    
        private final AsyncContext asyncContext;
        private final ServletOutputStream output;
    
        @GuardedBy("this")
        private boolean completed = false;
    
        public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
            this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
            this.output = checkNotNull(output, "output cannot be null");
        }
    
        @Override
        public void onWritePossible() throws IOException {
            writeImpl();
        }
    
        private synchronized void writeImpl() throws IOException {
            if (completed) {
                return;
            }
            while (output.isReady()) {
                final byte[] message = getNextMessage();
                if (message == null) {
                    output.flush();
                    return;
                }
                output.write(message);
            }
        }
    
        private synchronized void completeImpl() {
            // also stops DataFeederThread to call bufferArrived
            completed = true;
            asyncContext.complete();
        }
    
        @Override
        public void onError(final Throwable t) {
            logger.error("Writer.onError", t);
            completeImpl();
        }
    
        public void dataArrived() {
            try {
                writeImpl();
            } catch (RuntimeException | IOException e) {
                ...
            }
        }
    
        public void noMoreData() {
            completeImpl();
        }
    
        @Override
        public synchronized void onComplete(final AsyncEvent event) throws IOException {
            completed = true; // might not needed but does not hurt
        }
    
        @Override
        public synchronized void onTimeout(final AsyncEvent event) throws IOException {
            completeImpl();
        }
    
        @Override
        public void onError(final AsyncEvent event) throws IOException {
            logger.error("onError", event.getThrowable());
        }
    
        ...
    }
    

    更新2018-08-01 :实际上,它不能完全解决警告,请参阅:“Closed while Pending/Unready” warnings from Jetty

    10-05 19:39