TCP以流的方式进行数据传输,上层应用协议为了对消息进行区分,通常采用以下4中方式:
- 消息长度固定,累计读取到长度综合为定长LEN的报文后,就认为读取到了一个完整的消息,将计数器置位,重新开始读取下一个数据报;
- 将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛;
- 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的分隔符;
- 通过在消息头中定义长度字段来标识消息的总长度。
DelimiterBaseFrameDecoder——分隔符解码器,FixedLengthFrameDecoder——定长解码器
下面我们采用#为分隔符进行代码练习运行。
EchoServer服务端代码
package com.decoder; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; public class EchoServer {
public void bind(int port) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.childHandler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());//创建一个分隔符,确定为结束标志
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
.addLast(new StringDecoder())
.addLast(new EchoServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if(args.length>0&&args!=null){
port = Integer.parseInt(args[0]);
}
new EchoServer().bind(port); }
}
服务端处理IO代码
package com.decoder; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; public class EchoServerHandler extends ChannelInboundHandlerAdapter {
int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("This is"+ ++count +" times server receive client request.");
body += "#";
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端发送消息代码
package com.decoder; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; public class EchoClient {
public void connection(int port,String host) throws InterruptedException {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
.addLast(new StringDecoder())
.addLast(new EchoClientHandler());
//
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host,port).sync();
// 等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 8080;
if(args.length>0&&args!=null){
System.out.println(args[0]);
port = Integer.parseInt(args[0]);
}
new EchoClient().connection(port,"127.0.0.1");
}
}
客户端处理IO代码
package com.decoder; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private int count;
static final String ECHO_REQ = "hello,zuixiaoyao,welcome here!#"; @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0;i<10;i++){
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("this is client receive msg"+ ++count +"times:【"+body+"】");
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}
运行结果
服务端
客户端
若采用定长解码器,运行上面代码看看会发生什么,我们只需要对上面服务器中解码器换为定长解码器即可,解码器最大长度设置为20,看看
修改的服务端代码如下:
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("#".getBytes());//创建一个分隔符,确定为结束标志
socketChannel.pipeline()
// .addLast(new DelimiterBasedFrameDecoder(1024,delimiter))
// 修改为定长解码器
.addLast(new FixedLengthFrameDecoder(20))
.addLast(new StringDecoder())
.addLast(new EchoServerHandler());
}
运行后如下:
服务端结果
客户端结果
我们发现所有运行返回的代码都不超过20字符。这就是按照定长解析的,但是解析的比较乱,具体的原理还需深入学习后才知道,暂时不表。
按书上的操作,输入一行超过20字符的命令请求时,只返回一个20字符定长的数据显示。