我正在使用Netty 4.0.32。
我已经分配了MaxDirectMemorySize:256M

我的BootStrap是这样的:

bootStrap = new ServerBootstrap();
childGroup = new NioEventLoopGroup();
bootStrap.localAddress(protocolConstant.getPort());
bootStrap.channel(NioServerSocketChannel.class);
bootStrap.group(PARENTGROUP, childGroup);
bootStrap.childHandler(new MailChannelInitializer());
bootStrap.option(ChannelOption.SO_BACKLOG, BACKLOG);
bootStrap.childOption(ChannelOption.AUTO_READ, true);
bootStrap.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1 * 1024);
bootStrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 512);
bootStrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 256);
bootStrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);


我的管道如下所示:

SslHandler ByteToMessageCodec(将ByteBuf转换为byte [],反之亦然) BusinessLogicHandler

1)用于sslsupport的SslHandler。我正在使用Java的SslEngine。

2)我像这样扩展ByteToMessageCodec:

private class ByteConversionCodec extends ByteToMessageCodec<byte[]> {

    @Override
    protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out)
            throws Exception {
        if (!ctx.channel().isWritable()) {
            ctx.flush();
        }
        out.writeBytes(msg);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
            List<Object> out) throws Exception {
        byte[] arr;
        if (in.hasArray()) {
            arr = in.array();
        }
        else {
            arr = new byte[in.readableBytes()];
            in.readBytes(arr);
        }
        out.add(arr);
    }

}


如果在调用编码时通道不可写,则添加刷新请求,以使通道再次可写。它是否正确?

3)BusinessLogicHandler将处理指定给线程池,该线程池执行asyncprocessing(涉及IO)并使用SocketChannel对象写回到管道。所有写操作均源自线程池。最后,我在所有写入之后添加了flush(),以便刷新所有未完成的写入。每个写调用都涉及一个byte [],最大大小为300bytes。所有写入的总和总计约为20Mb。

Thread [nioEventLoopGroup-3-1] (Suspended)
    Thread.sleep(long) line: not available [native method]
    Bits.reserveMemory(long, int) line: 651
    DirectByteBuffer.<init>(int) line: 123
    ByteBuffer.allocateDirect(int) line: 306
    PoolArena$DirectArena.newChunk(int, int, int, int) line: 645
    PoolArena$DirectArena(PoolArena<T>).allocateNormal(PooledByteBuf<T>, int, int) line: 228
    PoolArena$DirectArena(PoolArena<T>).allocate(PoolThreadCache, PooledByteBuf<T>, int) line: 212
    PoolArena$DirectArena(PoolArena<T>).allocate(PoolThreadCache, int, int) line: 132
    PooledByteBufAllocator.newDirectBuffer(int, int) line: 271
    PooledByteBufAllocator(AbstractByteBufAllocator).directBuffer(int, int) line: 155
    PooledByteBufAllocator(AbstractByteBufAllocator).directBuffer(int) line: 146
    PooledByteBufAllocator(AbstractByteBufAllocator).buffer(int) line: 83
    SslHandler.allocate(ChannelHandlerContext, int) line: 1504
    SslHandler.allocateOutNetBuf(ChannelHandlerContext, int) line: 1514
    SslHandler.wrap(ChannelHandlerContext, boolean) line: 517
    SslHandler.flush(ChannelHandlerContext) line: 500
    DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeFlush() line: 663
    DefaultChannelHandlerContext(AbstractChannelHandlerContext).flush() line: 644
    Server$MailChannelInitializer$ByteConversionCodec.encode(ChannelHandlerContext, byte[], ByteBuf) line: 134
    Server$MailChannelInitializer$ByteConversionCodec.encode(ChannelHandlerContext, Object, ByteBuf) line: 1
    ByteToMessageCodec$Encoder.encode(ChannelHandlerContext, I, ByteBuf) line: 168
    ByteToMessageCodec$Encoder(MessageToByteEncoder<I>).write(ChannelHandlerContext, Object, ChannelPromise) line: 107
    Server$MailChannelInitializer$ByteConversionCodec(ByteToMessageCodec<I>).write(ChannelHandlerContext, Object, ChannelPromise) line: 108
    DefaultChannelHandlerContext(AbstractChannelHandlerContext).invokeWrite(Object, ChannelPromise) line: 633
    AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext, Object, ChannelPromise) line: 32
    AbstractChannelHandlerContext$WriteTask(AbstractChannelHandlerContext$AbstractWriteTask).write(AbstractChannelHandlerContext, Object, ChannelPromise) line: 908
    AbstractChannelHandlerContext$WriteTask(AbstractChannelHandlerContext$AbstractWriteTask).run() line: 893
    NioEventLoop(SingleThreadEventExecutor).runAllTasks(long) line: 358
    NioEventLoop.run() line: 357
    SingleThreadEventExecutor$2.run() line: 112
    DefaultThreadFactory$DefaultRunnableDecorator.run() line: 137
    FastThreadLocalThread(Thread).run() line: 745


因此,我发现获取内容存在延迟。线程休眠一段时间以获取内存。有人能帮助我解决这种情况吗?谢谢。

最佳答案

如果在调用编码时通道不可写,则添加刷新请求,以使通道再次可写。它是否正确?


在我看来,这不合适。您的编码器没有尝试写入通道,而应该只是尝试byte[]进入ByteBuf。您的BusinessLogicHandler已经做出是否写入以及是否刷新的决定。

您是否尝试过删除此代码以查看它是否可以解决您的问题?

if (!ctx.channel().isWritable()) {
    ctx.flush();
}

09-28 04:32