因此,我们目前正在基于MQTT的消息传递后端中将netty 3.x升级到netty 4.1。在我们的应用程序中,我们使用定制的MQTT消息解码器和编码器。
对于我们的解码器,我目前正在使用ByteToMessageDecoder,如下所示:
public class MqttMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 2) {
return;
}
.....
.....
.....
byte[] data = new byte[msglength];
in.resetReaderIndex();
in.readBytes(data);
MessageInputStream mis = new MessageInputStream(
new ByteArrayInputStream(data));
Message msg = mis.readMessage();
out.add(msg);
ReferenceCountUtil.release(in);
}
}
其中
Message
是我们的自定义对象,该对象将传递到下一个ChannelHandler
的channelRead()
。如您所见,一旦我从传入的ByteBuf
对象创建了in
对象,我就完成了对它的处理。因此,由于Message
是按净值引用计数的,所以我需要通过调用ByteBuf
在此处释放in
对象是否正确?理想情况下,根据doc看来这是正确的。但是,当我这样做时,我似乎正面临着一个例外:Wed May 24 io.netty.channel.DefaultChannelPipeline:? WARN netty-workers-7 An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.channel.ChannelPipelineException: com.bsb.hike.mqtt.MqttMessageDecoder.handlerRemoved() has thrown an exception.
at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:631) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:867) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.access$300(DefaultChannelPipeline.java:45) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline$9.run(DefaultChannelPipeline.java:874) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:339) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:374) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:742) [netty-all-4.1.0.Final.jar:4.1.0.Final]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:111) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.handler.codec.ByteToMessageDecoder.handlerRemoved(ByteToMessageDecoder.java:217) ~[netty-all-4.1.0.Final.jar:4.1.0.Final]
at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626) [netty-all-4.1.0.Final.jar:4.1.0.Final]
... 7 common frames omitted
这告诉我,当关闭子通道时,管道中的所有处理程序都将被依次删除。当关闭此解码器处理程序时,我们将明确释放此解码器附带的
ReferenceCountUtil.release(in)
,当调用以下方法时,这会导致ByteBuf
异常。这是
IllegalReferenceCountException
:@Override
public boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
那么释放
AbstractReferenceCountedByteBuf#release
对象而不遇到此问题的正确方法是什么?我正在使用
ByteBuf
-new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
如果您需要有关配置的更多信息,请告诉我。
编辑:
作为Ferrybig答案的附加组件,
PooledByteBufAllocator
自己处理传入的ByteToMessageDecoder#channelRead
的释放。参见ByteBuf
块-@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
如果将入站
finally
传递到管道中的下一个通道处理程序,则该ByteBuf
的引用计数将通过ByteBuf
增加,因此,如果解码器之后的下一个处理程序是您的业务处理程序(通常是情况),则需要在该位置释放该ByteBuf#retain
对象,以避免任何内存泄漏。这在docs中也提到过。 最佳答案
并非所有处理程序都要求销毁传入的bytebuf。 ByteToMessageDecoder
是其中之一。
原因是此处理程序收集多个传入的字节缓冲区,并将它们作为1个连续的字节流公开给您的应用程序,以简化编码,并且无需您自己处理这些块。
请记住,如javadoc所述,您仍然需要使用readBytes
或readSlice
手动释放您创建的所有字节缓冲区。