我正在尝试使用Netty在Java中实现TCP服务器。我能够正确处理长度小于1024的消息,但是当我收到大于1024的消息时,只能看到部分消息。
我做了一些研究,发现我应该实现replayingdecoder,但是我不明白如何实现解码方法
我的消息使用JSON
Netty版本4.1.27
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
我的服务器设置
EventLoopGroup group;
group = new NioEventLoopGroup(this.numThreads);
try {
ServerBootstrap serverBootstrap;
RequestHandler requestHandler;
ChannelFuture channelFuture;
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(group);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.localAddress(new InetSocketAddress("::", this.port));
requestHandler = new RequestHandler(this.responseManager, this.logger);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(requestHandler);
}
});
channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
}
catch(Exception e){
this.logger.info(String.format("Unknown failure %s", e.getMessage()));
}
finally {
try {
group.shutdownGracefully().sync();
}
catch (InterruptedException e) {
this.logger.info(String.format("Error shutting down %s", e.getMessage()));
}
}
我当前的请求处理程序
package me.chirag7jain.Response;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
public class RequestHandler extends ChannelInboundHandlerAdapter {
private ResponseManager responseManager;
private Logger logger;
public RequestHandler(ResponseManager responseManager, Logger logger) {
this.responseManager = responseManager;
this.logger = logger;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf;
String data, hostAddress;
byteBuf = (ByteBuf) msg;
data = byteBuf.toString(CharsetUtil.UTF_8);
hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
if (!data.isEmpty()) {
String reply;
this.logger.info(String.format("Data received %s from %s", data, hostAddress));
reply = this.responseManager.reply(data);
if (reply != null) {
ctx.write(Unpooled.copiedBuffer(reply, CharsetUtil.UTF_8));
}
}
else {
logger.info(String.format("NO Data received from %s", hostAddress));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
this.logger.info(String.format("Received Exception %s", cause.getMessage()));
ctx.close();
}
最佳答案
我会在channelRead()中接受数据并将其存储在缓冲区中。从channelRead()返回之前,我将在Channel上调用read()。您可能需要根据需要记录其他数据。
当netty调用channelReadComplete()时,有片刻时间将整个缓冲区发送到您的ResponseManager。
Channel read():请求从Channel读取数据到第一个
入站缓冲区,触发
ChannelInboundHandler.channelRead(ChannelHandlerContext,Object)事件
如果已读取数据,并触发channelReadComplete事件,则
处理程序可以决定继续阅读。
您的Channel对象可通过ctx.channel()访问。
试试这个代码:
private final AttributeKey<StringBuffer> dataKey = AttributeKey.valueOf("dataBuf");
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf;
String data, hostAddress;
StringBuffer dataBuf = ctx.attr(dataKey).get();
boolean allocBuf = dataBuf == null;
if (allocBuf) dataBuf = new StringBuffer();
byteBuf = (ByteBuf) msg;
data = byteBuf.toString(CharsetUtil.UTF_8);
hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
if (!data.isEmpty()) {
this.logger.info(String.format("Data received %s from %s", data, hostAddress));
}
else {
logger.info(String.format("NO Data received from %s", hostAddress));
}
dataBuf.append(data);
if (allocBuf) ctx.attr(dataKey).set(dataBuf);
ctx.channel().read();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
StringBuffer dataBuf = ctx.attr(dataKey).get();
if (dataBuf != null) {
String reply;
reply = this.responseManager.reply(dataBuf.toString());
if (reply != null) {
ctx.write(Unpooled.copiedBuffer(reply, CharsetUtil.UTF_8));
}
}
ctx.attr(dataKey).set(null);
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}