我正在尝试使用数据包编写一个简单的TCP服务器/客户端系统。当客户端处于活动状态时,我让它向服务器发送1个数据包,服务器很好地接收它,但是随后客户端抛出异常(如下所示),我不确定为什么。客户端不应再收到任何数据。
客户
public class Client {
public static void main(String[] args) throws Exception {
new Client("localhost", 8000);
}
private Channel channel;
public Client(final String host, final int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
new Bootstrap()
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new PacketDecoder());
ch.pipeline().addLast("encoder", new PacketEncoder());
ch.pipeline().addLast(new Handler());
}
})
.connect(host, port).sync().channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
private class Handler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
System.out.println("Connected");
//channel.writeAndFlush(new SimplePacket(25));
channel.writeAndFlush(new SimplePacket(50));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Disconnected");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
System.out.println("Received packet: " + packet.getId() + " | " + packet.toString());
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
服务器
public class Server {
public static void main(String[] args) throws Exception {
new Server(8000);
}
private final Set<Channel> channels = new HashSet<Channel>();
public Server(final int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new PacketDecoder());
ch.pipeline().addLast("encoder", new PacketEncoder());
ch.pipeline().addLast(new Handler());
}
});
b.bind(port)
.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("Listening on " + port);
} else {
System.out.println("Could not bind to host");
}
}
})
.sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class Handler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
System.out.println("Client connected [" + channels.size() + "]: " + ctx.channel().remoteAddress());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
channels.remove(ctx.channel());
System.out.println("Client disconnected [" + channels.size() + "]: " + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Packet packet = (Packet) msg;
System.out.println("Received packet: " + packet.getId() + " | " + packet.toString());
}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
包解码器
public class PacketDecoder extends ByteToMessageDecoder {
private final PacketManager packetManager;
public PacketDecoder(PacketManager packetManager) {
this.packetManager = packetManager;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) {
return;
}
while (buf.readableBytes() != 0) {
int id = buf.readInt();
if (buf.readableBytes() != 0) {
if (packetManager.isRegistered(id)) {
Packet packet = packetManager.getPacket(id);
packet.read(buf);
out.add(packet);
} else {
buf.skipBytes(buf.readableBytes());
throw new DataException("Cannot receive unregistered packet: " + id);
}
}
}
}
}
分组编码器
public class PacketEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf buf) throws Exception {
buf.writeInt(packet.getId());
packet.write(buf);
}
}
SimplePacket类
public class SimplePacket extends Packet {
private int data;
public SimplePacket(int data) {
this.data = data;
}
public SimplePacket() {
}
@Override
public void read(ByteBuf buf) {
data = buf.readInt();
}
@Override
public void write(ByteBuf buf) {
buf.writeInt(data);
}
@Override
public int getId() {
return 1000;
}
@Override
public String toString() {
return "{" + data + "}";
}
}
例外
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(78) + length(4) exceeds writerIndex(80): UnpooledUnsafeDirectByteBuf(ridx: 78, widx: 80, cap: 80)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:257)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:139)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:724)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(78) + length(4) exceeds writerIndex(80): UnpooledUnsafeDirectByteBuf(ridx: 78, widx: 80, cap: 80)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1161)
at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:612)
at dataserver.packet.Packet.readString(Packet.java:21)
at dataserver.packet.packets.SequencePacket.read(SequencePacket.java:74)
at dataserver.packet.codec.PacketDecoder.decode(PacketDecoder.java:27)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:226)
... 10 more
io.netty.handler.codec.DecoderException: dataserver.DataException: Cannot receive unregistered packet: 2
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:257)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:139)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:724)
Caused by: dataserver.DataException: Cannot receive unregistered packet: 2
at dataserver.packet.codec.PacketDecoder.decode(PacketDecoder.java:31)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:226)
... 10 more
我正在使用的新数据包(数据大小可能有所不同)
public class SequencePacket extends Packet {
private static final Map<Character, Class<? extends Object>> types = new HashMap<Character, Class<? extends Object>>();
static {
types.put('b', Byte.class);
types.put('f', Float.class);
types.put('d', Double.class);
types.put('s', Short.class);
types.put('i', Integer.class);
types.put('l', Long.class);
types.put('c', Character.class);
types.put('S', String.class);
types.put('B', Boolean.class);
}
private final List<Object> data = new ArrayList<Object>();
public SequencePacket() {
}
public SequencePacket(Object...objects) {
for (Object object : objects) {
write(object);
}
}
@Override
public void read(ByteBuf buf) {
String sequence = Packet.readString(buf).trim();
System.out.println("Sequence: " + sequence);
char[] split = sequence.toCharArray();
for (int i = 0; i < split.length; i++) {
char c = split[i];
if (!types.containsKey(c)) {
throw new DataException("Bad sequence character in " + sequence + ": " + c);
}
switch (c) {
case 'b':
data.add(buf.readByte());
break;
case 'f':
data.add(buf.readFloat());
break;
case 'd':
data.add(buf.readDouble());
break;
case 's':
data.add(buf.readShort());
break;
case 'i':
data.add(buf.readInt());
break;
case 'l':
data.add(buf.readLong());
break;
case 'c':
data.add(buf.readChar());
break;
case 'S':
data.add(Packet.readString(buf));
break;
case 'B':
data.add(buf.readBoolean());
break;
}
}
}
@Override
public void write(ByteBuf buf) {
StringBuilder sequence = new StringBuilder();
for (Object object : data) {
sequence.append(getType(object.getClass()));
}
Packet.writeString(buf, sequence.toString());
for (Object object : data) {
switch (getType(object.getClass())) {
case 'b':
buf.writeByte((Byte) object);
break;
case 'f':
buf.writeFloat((Float) object);
break;
case 'd':
buf.writeDouble((Double) object);
break;
case 's':
buf.writeShort((Short) object);
break;
case 'i':
buf.writeInt((Integer) object);
break;
case 'l':
buf.writeLong((Long) object);
break;
case 'c':
buf.writeChar((Character) object);
break;
case 'S':
Packet.writeString(buf, (String) object);
break;
case 'B':
buf.writeBoolean((Boolean) object);
break;
}
}
}
@Override
public int getId() {
return 0;
}
public SequencePacket write(Object o) {
if (!types.containsValue(o.getClass())) {
throw new DataException("Cannot add object type to sequence: " + o.getClass().getSimpleName());
}
data.add(o);
return this;
}
public byte getByte(int index) {
return (Byte) data.get(index);
}
public float getFloat(int index) {
return (Float) data.get(index);
}
public double getDouble(int index) {
return (Double) data.get(index);
}
public short getShort(int index) {
return (Short) data.get(index);
}
public int getInt(int index) {
return (Integer) data.get(index);
}
public long getLong(int index) {
return (Long) data.get(index);
}
public char getChar(int index) {
return (Character) data.get(index);
}
public String getString(int index) {
return data.get(index).toString();
}
public boolean getBoolean(int index) {
return (Boolean) data.get(index);
}
public Object getObject(int index) {
return data.get(index);
}
public boolean isByte(int index) {
return data.get(index).getClass() == Byte.class;
}
public boolean isFloat(int index) {
return data.get(index).getClass() == Float.class;
}
public boolean isDouble(int index) {
return data.get(index).getClass() == Double.class;
}
public boolean isShort(int index) {
return data.get(index).getClass() == Short.class;
}
public boolean isInt(int index) {
return data.get(index).getClass() == Integer.class;
}
public boolean isLong(int index) {
return data.get(index).getClass() == Long.class;
}
public boolean isChar(int index) {
return data.get(index).getClass() == Character.class;
}
public boolean isString(int index) {
return data.get(index).getClass() == String.class;
}
public boolean isBoolean(int index) {
return data.get(index).getClass() == Boolean.class;
}
public List<Object> getAll() {
return data;
}
public int size() {
return data.size();
}
public boolean isEmpty() {
return data.isEmpty();
}
public SequencePacket clear() {
data.clear();
return this;
}
public boolean hasIndex(int index) {
return index >= 0 && index < data.size();
}
public Class<? extends Object> getClass(int index) {
return data.get(index).getClass();
}
private char getType(Class<? extends Object> clazz) {
char c = ' ';
for (Entry<Character, Class<? extends Object>> entry : types.entrySet()) {
if (entry.getValue() == clazz) {
c = entry.getKey();
break;
}
}
if (c == ' ') {
throw new DataException("Could not find type in sequence: " + clazz.getSimpleName());
}
return c;
}
@Override
public String toString() {
StringBuilder result = new StringBuilder();
result.append("{");
if (data != null) {
for (Object object : data) {
result.append(object.toString());
result.append(", ");
}
if (result.length() > 2) {
result.setLength(result.length() - 2);
}
}
result.append("}");
return result.toString();
}
}
最佳答案
每当通道处于非活动状态时,都会调用该方法,这是从ByteToMessageDecoder继承的默认行为,要解决此问题,您必须检查PacketDecoder.decode中是否有空缓冲区,如果缓冲区为空,则只需返回即可。
发生异常2是因为在您的代码中存在以下逻辑:
创建一个SimplePacket
使SimplePacket从缓冲区读取一个整数
如果缓冲区抛出异常中剩余数据
如果没有剩余数据,则将数据包添加到列表中
而且由于您从客户端发送了两个SimplePacket,因此很可能会引发异常。要解决此问题,您必须在服务器中循环创建SimplePacket:
while(buf.readableBytes() != 0){
Packet packet = new SimplePacket();
packet.read(buf);
out.add(packet);
}
在TCP包中,已发送的数据包将被可靠地发送,并且将按照相同的顺序接收,但不必以与发送时相同的块大小接收,如果您不开始以这种方式思考,您将继续在代码中引入错误。唯一的例外是我希望:),这可能是由于您开始从下面的代码中从缓冲区读取字节而无视实际可用的字节数。仅检查缓冲区是否有数据还不够,还需要检查缓冲区是否有足够的数据。并且,直到有足够的数据,您什么都不读,您只是让数据在缓冲区中累积,直到缓冲区的大小大于或等于协议中可解析字节的最小数量为止。
while (buf.readableBytes() != 0) {
int id = buf.readInt();
if (packetManager.isRegistered(id)) {
Packet packet = packetManager.getPacket(id);
packet.read(buf);
out.add(packet);
} else {
buf.skipBytes(buf.readableBytes());
throw new DataException("Cannot receive unregistered packet: " + id);
}
}
关于java - 网路解码错误,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/21214623/