我创建了一个netty.io BootStrap,该设备从旧服务器接收流数据。服务器使用ISO-8859-1字符集发送数据。内部还有一个使用不同分隔符字节的内部“协议”:
private static final byte GS = 29; // FS ASCII char 1D (group separator)
private static final byte RS = 30; // FS ASCII char 1E (record separator)
private static final byte US = 31; // FS ASCII char 1F (unit separator)
private static final ByteProcessor GROUP_SEPARATOR_LOCATOR = value -> value != GS;
private static final ByteProcessor RECORD_SEPARATOR_LOCATOR = value -> value != RS;
private static final ByteProcessor UNIT_SEPARATOR_LOCATOR = value -> value != US;
这些ByteProcessor实例用于拆分消息。最终,每条消息都转换为其对应的对象表示形式,而keyValueMapping包含原始消息中的主要内容:
public class Update {
private final String id;
private final UpdateType updateType;
private final Map<String, byte[]> keyValueMapping;
// SOME OTHER STUFF
}
随后,所有更新都转发到所有连接的Web套接字客户端,这些客户端由单独的ServerBootStrap处理:
public void distribute(ChannelGroup recipients, Object msg) {
Update updateMsg = (Update) msg;
recipients.writeAndFlush(updateMsg);
}
当我激活Java Flight Recording并执行一些负载测试时,我意识到主要的分配热点是将初始入站消息中的值转换为ISO-8859-1字节数组的方法:
private byte[] translateValue(ByteBuf in) {
byte [] result;
if (!in.hasArray()) {
result = new byte[in.readableBytes()];
in.getBytes(in.readerIndex(), result);
} else {
result = in.array();
}
return result;
}
最初,我没有翻译ByteBuf,而是直接将它们存储在Update的keyValueMapping映射中。由于ByteBuf对象维护着一些不受保护的内部索引(读取器,写入器,标记等),因此,我害怕将这些ByteBuf封装并转发到不同的通道(请参见上面的收件人channelGroup),因此决定继续使用用byte []表示。
检查Java飞行记录的结果,我想知道是否有任何建议如何将未更改的入站数据分配到一组不同的通道而又不会过多地限制GC?从结果中学到,直接缓冲区被用于给定通道,因为创建了许多新的字节数组。
为了提供更多的上下文,我还添加了执行剩余消息翻译的代码:
while (in.readableBytes() > 0) {
ByteBuf keyAsByteBuf = nextToken(in, UNIT_SEPARATOR_LOCATOR);
String key = translateKey(keyAsByteBuf);
if (key != null) {
ByteBuf valueAsByteBuf = nextToken(in, RECORD_SEPARATOR_LOCATOR);
byte[] value = translateValue(valueAsByteBuf);
if (value.length > 0) {
mapping.put(key, value);
}
}
}
private ByteBuf nextToken(ByteBuf in, ByteProcessor locator) {
int separatorIdx = in.forEachByte(in.readerIndex(), in.readableBytes(), locator);
if (separatorIdx >= 0) {
ByteBuf token = in.readSlice(separatorIdx - in.readerIndex());
in.skipBytes(1);
return token;
}
return in.readSlice(in.readableBytes());
}
private String translateKey(ByteBuf in) {
return keyTranslator.translate(in);
}
最佳答案
嗯...实际上,您的问题不是那么简单。我将简要回答。
如果您的应用程序中不需要ByteBuf
,则无需将byte[]
转换为ByteBuf
。因此,我假设您具有下一个结构:
public class Update {
private final String id;
private final UpdateType updateType;
private final Map<String, ByteBuf> keyValueMapping;
}
这里的问题是您部分解析了
ByteBuf
。因此,您在此Java对象中包含Java对象+ ByteBuf's
。很好,您可以进一步使用这些
Update
。您的类ReferenceCounted
应该实现recipients.writeAndFlush(updateMsg)
接口。因此,当您执行DefaultChannelGroup
(假设收件人为DefaultChannelGroup
)时,netty recipients.writeAndFlush(updateMsg)
将处理对这些缓冲区的引用。那么会发生什么:
在
DefaultChannelGroup
之后,循环中的updateMsg
使用channel.writeAndFlush(safeDuplicate(message))
将safeDuplicate
发送到列表中的每个通道。 ByteBuf
是处理对netty retainedDuplicate()
的引用的特殊方法,因此您可以将同一缓冲区发送到多个接收者(它实际上使用ByteBuf
复制缓冲区)。但是,您的对象不是ByteBuf
,而是java对象。这是该方法的代码:private static Object safeDuplicate(Object message) {
if (message instanceof ByteBuf) {
return ((ByteBuf) message).retainedDuplicate();
} else if (message instanceof ByteBufHolder) {
return ((ByteBufHolder) message).retainedDuplicate();
} else {
return ReferenceCountUtil.retain(message);
}
}
因此,为了正确处理
ReferenceCounted
的引用,您需要为ReferenceCountUtil.retain(message)
实现release()
。像这样:public class Update implements ReferenceCounted {
@Override
public final Update retain() {
return new Update(id, updateType, makeRetainedBuffers());
}
private Map makeRetainedBuffers() {
Map newMap = new HashMap();
for (Entry entry : keyValueMapping) {
newMap.put(entry.key, entry.value.duplicate().retain())
}
return newMap;
}
}
这只是一个伪代码。但是你应该明白这个想法。您还必须在
Update
类内实现Update
方法,并确保它始终释放其持有的缓冲区。并释放其中的所有缓冲区。我假设您已经在要调用release()
的DefaultChannelGroup
类的管道中具有编码器。另一种选择是实现自己的
safeDuplicate
。在这种情况下,您不必依赖ReferenceCounted
方法。因此,您不需要实现,但是仍然需要处理该类中的保留,手动释放。关于java - 如何有效地将入站netty.io ByteBuf消息分发到ChannelGroup?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47894916/