我已经看到了很多有关Netty中分块流的问题,但是其中大多数是关于出站流而不是入站流的解决方案。
我想了解如何从 channel 获取数据并将其作为InputStream发送到我的业务逻辑,而无需先将所有数据加载到内存中。
这是我正在尝试做的事情:
public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {
private HttpServletRequest request;
private PipedOutputStream os;
private PipedInputStream is;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
this.os = new PipedOutputStream();
this.is = new PipedInputStream(os);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
this.os.close();
this.is.close();
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
if (msg instanceof HttpRequest) {
this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
out.add(this.request);
}
if (msg instanceof HttpContent) {
ByteBuf body = ((HttpContent) msg).content();
if (body.readableBytes() > 0)
body.readBytes(os, body.readableBytes());
if (msg instanceof LastHttpContent) {
os.close();
}
}
}
}
然后,我有另一个处理程序,它将获取我的CustomHttpRequest并将其发送到我所谓的ServiceHandler,在这里我的业务逻辑将从InputStream中读取。
public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
@Override
public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
future = serviceHandler.handle(request, response);
...
这是行不通的,因为当我的处理程序将CustomHttpRequest转发到ServiceHandler并尝试从InputStream读取时,该线程正在阻塞,并且在我的Decoder中从未处理过HttpContent。
我知道我可以尝试为我的业务逻辑创建一个单独的线程,但是我的印象是我在这里使事情变得过于复杂。
我看着ByteBufInputStream,但是它说
因此,我认为它不适用于Chunked Http请求。另外,我看到了ChunkedWriteHandler,对于Oubound块来说似乎还不错,但是我找不到ChunkedReadHandler之类的东西。
所以我的问题是:做到这一点的最佳方法是什么?我的要求是:
-在发送ServiceHandler之前不要将数据保留在内存中;
-ServiceHandlers API应该与netty无关(这就是为什么我使用CustomHttpRequest而不是Netty的HttpRequest的原因);
更新
我已经在CustomHttpRequest上使用更具 react 性的方法来工作。现在,该请求没有向ServiceHandler提供输入流,因此它们可以读取(正在阻塞),但是CustomHttpRequest现在具有
readInto(OutputStream)
方法,该方法返回Future,并且所有的服务处理程序都将在此Outputstream被执行时执行。满满的。这是它的样子public class CustomHttpRequest {
...constructors and other methods hidden...
private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();
private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();
private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);
public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
outputStreamFuture.set(os);
return this.writeCompleteFuture;
}
ListenableFuture<Void> writeChunk(byte[] buf) {
this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
outputStreamFuture.get().write(buf);
return Futures.immediateFuture(null);
});
return lastWriteFuture;
}
void complete() {
ListenableFuture<Void> future =
Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
outputStreamFuture.get().close();
return Futures.immediateFuture(null);
});
addFinallyCallback(future, () -> {
this.writeCompleteFuture.set(null);
});
}
}
我更新的ServletRequestHandler看起来像这样:
public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {
private NettyHttpServletRequestAdaptor request;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}
@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
this.request = new CustomHttpRequest(request, ctx.channel());
out.add(this.request);
}
if (msg instanceof HttpContent) {
ByteBuf buf = ((HttpContent) msg).content();
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
this.request.writeChunk(bytes);
if (msg instanceof LastHttpContent) {
this.request.complete();
}
}
}
}
这工作得很好,但是仍然要注意,这里的所有操作都是在单个线程中完成的,对于大型数据,也许我想生成一个新线程以将该线程释放给其他 channel 。
最佳答案
您处在正确的轨道上-如果serviceHandler.handle(request, response);
调用正在进行阻止读取,则需要为其创建一个新线程。请记住,应该只有少量的Netty工作线程,因此您不应在工作线程中进行任何阻塞调用。
另一个要问的问题是,您的服务处理程序是否需要阻塞?它有什么作用?如果仍然通过网络发送数据,您是否可以通过非阻塞方式将其合并到Netty管道中?这样一来,一切都是异步的,无需阻塞调用和额外的线程。