我正在尝试使用Netty在Java中实现TCP服务器。我能够正确处理长度小于1024的消息,但是当我收到大于1024的消息时,只能看到部分消息。

我做了一些研究,发现我应该实现replayingdecoder,但是我不明白如何实现解码方法

我的消息使用JSON

Netty版本4.1.27

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception


我的服务器设置

    EventLoopGroup group;

    group = new NioEventLoopGroup(this.numThreads);

    try {
        ServerBootstrap serverBootstrap;
        RequestHandler requestHandler;
        ChannelFuture channelFuture;

        serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(group);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.localAddress(new InetSocketAddress("::", this.port));

        requestHandler = new RequestHandler(this.responseManager, this.logger);

        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(requestHandler);
            }
        });

        channelFuture = serverBootstrap.bind().sync();
        channelFuture.channel().closeFuture().sync();
    }
    catch(Exception e){
        this.logger.info(String.format("Unknown failure %s", e.getMessage()));
    }
    finally {
        try {
            group.shutdownGracefully().sync();
        }
        catch (InterruptedException e) {
            this.logger.info(String.format("Error shutting down %s", e.getMessage()));
        }

    }


我当前的请求处理程序

package me.chirag7jain.Response;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;

public class RequestHandler extends ChannelInboundHandlerAdapter {

private ResponseManager responseManager;
private Logger logger;

public RequestHandler(ResponseManager responseManager, Logger logger) {
    this.responseManager = responseManager;
    this.logger = logger;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf;
    String data, hostAddress;

    byteBuf = (ByteBuf) msg;
    data = byteBuf.toString(CharsetUtil.UTF_8);
    hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();

    if (!data.isEmpty()) {
        String reply;

        this.logger.info(String.format("Data received %s from %s", data, hostAddress));
        reply = this.responseManager.reply(data);

        if (reply != null) {
            ctx.write(Unpooled.copiedBuffer(reply, CharsetUtil.UTF_8));
        }
    }
    else {
        logger.info(String.format("NO Data received from %s", hostAddress));
    }
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    this.logger.info(String.format("Received Exception %s", cause.getMessage()));
    ctx.close();
}

最佳答案

我会在channelRead()中接受数据并将其存储在缓冲区中。从channelRead()返回之前,我将在Channel上调用read()。您可能需要根据需要记录其他数据。
当netty调用channelReadComplete()时,有片刻时间将整个缓冲区发送到您的ResponseManager。


  Channel read():请求从Channel读取数据到第一个
  入站缓冲区,触发
  ChannelInboundHandler.channelRead(ChannelHandlerContext,Object)事件
  如果已读取数据,并触发channelReadComplete事件,则
  处理程序可以决定继续阅读。


您的Channel对象可通过ctx.channel()访问。

试试这个代码:

private final AttributeKey<StringBuffer> dataKey = AttributeKey.valueOf("dataBuf");

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf;
    String data, hostAddress;

    StringBuffer dataBuf = ctx.attr(dataKey).get();
    boolean allocBuf = dataBuf == null;
    if (allocBuf) dataBuf = new StringBuffer();

    byteBuf = (ByteBuf) msg;
    data = byteBuf.toString(CharsetUtil.UTF_8);
    hostAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();

    if (!data.isEmpty()) {
        this.logger.info(String.format("Data received %s from %s", data, hostAddress));
    }
    else {
        logger.info(String.format("NO Data received from %s", hostAddress));
    }

    dataBuf.append(data);
    if (allocBuf) ctx.attr(dataKey).set(dataBuf);

    ctx.channel().read();
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    StringBuffer dataBuf = ctx.attr(dataKey).get();
    if (dataBuf != null) {
        String reply;

        reply = this.responseManager.reply(dataBuf.toString());

        if (reply != null) {
            ctx.write(Unpooled.copiedBuffer(reply, CharsetUtil.UTF_8));
        }
    }
    ctx.attr(dataKey).set(null);

    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

07-24 16:05