Netty介绍:
Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
换句话说,Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty大大简化了网络程序的开发过程比如TCP和UDP的 Socket的开发。
“快速和简单”并不意味着应用程序会有难维护和性能低的问题,Netty是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如FTP、SMTP、HTTP、许多二进制和基于文本的传统协议,Netty在不降低开发效率、性能、稳定性、灵活性情况下,成功地找到了解决方案。
有一些用户可能已经发现其他的一些网络框架也声称自己有同样的优势,所以你可能会问是Netty和它们的不同之处。答案就是Netty的哲学设计理念。Netty从第一天开始就为用户提供了用户体验最好的API以及实现设计。正是因为Netty的设计理念,才让我们得以轻松地阅读本指南并使用Netty。
接下来,我们看下Client端的代码实现:
package ruizhan.hjf.netty; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* Netty客户端的程序
* @author huangjianfei
*/
public class Client {
/*IP地址*/
static final String HOST = System.getProperty("host", "127.0.0.1");
/*端口号*/
static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765")); static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764")); public static void main(String[] args) throws Exception {
EventLoopGroup workgroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();//客户端
b.group(workgroup)
.channel(NioSocketChannel.class)//客户端 -->NioSocketChannel
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {//handler
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new ClientHandler());
}
});
//创建异步连接 可添加多个端口
ChannelFuture cf1 = b.connect(HOST, PORT1).sync();
ChannelFuture cf2 = b.connect(HOST, PORT2).sync(); //buf
//client向server端发送数据 Buffer形式
cf1.channel().writeAndFlush(Unpooled.copiedBuffer("hello netty".getBytes()));
cf2.channel().writeAndFlush(Unpooled.copiedBuffer("hello world".getBytes())); cf1.channel().closeFuture().sync();
cf2.channel().closeFuture().sync(); workgroup.shutdownGracefully();
}
}
Servler端代码实现:
package ruizhan.hjf.netty; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Netty实现的服务端程序
* @author huangjianfei
*/
public class Server
{
/*端口号*/
static final int PORT1 = Integer.parseInt(System.getProperty("port", "8765")); static final int PORT2 = Integer.parseInt(System.getProperty("port", "8764")); public static void main(String[] args)
{
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
ServerBootstrap b = null;
try{
//1:第一个线程组是用于接收Client连接的
bossGroup = new NioEventLoopGroup(); //(1)
//2:第二个线程组是用于实际的业务处理操作的
workerGroup = new NioEventLoopGroup();
//3:创建一个启动NIO服务的辅助启动类ServerBootstrap 就是对我们的Server进行一系列的配置
b = new ServerBootstrap();//(2)
//4:绑定两个线程组
b.group(bossGroup, workerGroup)
//5:需要指定使用NioServerSocketChannel这种类型的通道
.channel(NioServerSocketChannel.class)//(3) 服务端 -->NioServerSocketChannel
//6:一定要使用childHandler 去绑定具体的事件处理器
.childHandler(new ChannelInitializer<SocketChannel>() //(4) childHandler
{
@Override
protected void initChannel(SocketChannel sc) throws Exception
{
//7:将自定义的serverHandler加入到管道中去(多个)
sc.pipeline().addLast(new ServerHandler());//handler中实现真正的业务逻辑
// sc.pipeline().addLast(new ServerHandler());
// sc.pipeline().addLast(new ServerHandler());
}
})
/**
* 服务器端TCP内核模块维护两个队列,我们称之为A,B吧
* 客户端向服务端connect的时候,会发送带有SYN标志的包(第一次握手)
* 服务端收到客户端发来的SYN时,向客户端发送SYN ACK确认(第二次握手)
* 此时TCP内核模块把客户端连接加入到A队列中,最后服务端收到客户端发来的ACK时(第三次握手)
* TCP内核模块把客户端连接从A队列移到B队列,连接成功,应用程序的accept会返回
* 也就是说accept从B队列中取出完成三次握手的连接
* A队列和B队列的长度之和是backLog,当A,B队列的长度之和大于backLog时,新连接将会被TCP内核拒绝
* 所以,如果backLog过小,可能会出现accept速度跟不上,A,B队列满了,导致新的客户端无法连接,
* 要注意的是,backLog对程序支持的连接数并无影响,backLog影响的只是还没有被accept取出的连接
*/
//8:设置TCP连接的缓冲区
.option(ChannelOption.SO_BACKLOG, 128)//(5)
// .option(ChannelOption.SO_SNDBUF, 32*1024) //设置发送缓冲大小
// .option(ChannelOption.SO_RCVBUF, 32*1024) //设置接收缓冲大小
//9:保持连接
.childOption(ChannelOption.SO_KEEPALIVE, true);//(6)
//10:绑定指定的端口 进行监听
//此处端口号先写死 也可以绑定多个端口
ChannelFuture cf2= b.bind(PORT1).sync(); // (7) ChannelFuture cf3= b.bind(PORT2).sync(); // (7) 绑定多个端口 //Thread.sleep(10000);
cf2.channel().closeFuture().sync(); //异步等待关闭
cf3.channel().closeFuture().sync(); //异步等待关闭 }catch(Exception e){
e.printStackTrace();
}finally{
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
接下来,就是真正去实现数据传输的业务逻辑层代码的实现,在这里也就是ClientHanlder和ServlerHandler:
package ruizhan.hjf.netty; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil; /**
* 客户端业务处理类
* (编写主要的业务逻辑)
* @author huangjianfei
*/
public class ClientHandler extends ChannelHandlerAdapter
{
// ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。
// 请记住处理器的职责是释放所有传递到处理器的引用计数对象。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
try{
//do something
//接收服务端发来的数据 ByteBuf
ByteBuf buf = (ByteBuf)msg;
//创建一个和buf一样长度的字节空数组
byte[] data = new byte[buf.readableBytes()];
//将buf中的数据读取到data数组中
buf.readBytes(data);
//将data数组惊醒包装 以String格式输出
String response = new String(data,"utf-8");
System.out.println("client :"+response); //以上代码是接收服务端发来的反馈数据// ctx.close();
}finally{
// Discard the received data silently.
ReferenceCountUtil.release(msg);
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
package ruizhan.hjf.netty; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil; /**
* 服务端业务处理类
* (编写主要的业务逻辑)
* @author huangjianfei
*/
public class ServerHandler extends ChannelHandlerAdapter
{ /**
* 每当从客户端收到新的数据时,这个方法会在收到消息时被调用
* ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放。
* 请记住处理器的职责是释放所有传递到处理器的引用计数对象。
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
try{
//do something
//接收客户端发送的数据 ByteBuf
ByteBuf buf = (ByteBuf)msg;
//创建一个和buf长度一样的空字节数组
byte[] data = new byte[buf.readableBytes()];
//将buf中的数据读取到data数组中
buf.readBytes(data);
//将data数据包装成string输出
String request = new String(data,"utf-8");
System.out.println("server :"+request); //以上代码是接收客户端信息// //server端向client发送反馈数据
//如果是绑定了多个端口 那么都会进行发送
ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()))
.addListener(ChannelFutureListener.CLOSE);//添加监听 当服务端向客户端发送完数据后,关闭connect连接
/**
* ChannelFutureListener,当一个写请求完成时通知并且关闭Channel
* 加上监听 意味着服务端回送数据到客户端时 连接关闭(短连接)
* 不加监听 意味着客户端与服务端一直保持连接状态(长连接)
*/ ctx.close();
}finally{
// Discard the received data silently.
ReferenceCountUtil.release(msg);
}
} /**
* exceptionCaught()事件处理方法是当出现Throwable对象才会被调用
* 即当Netty由于IO错误或者处理器在处理事件时抛出的异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
} }
以上是Netty的基础入门实现,详见并发编程网,http://ifeve.com/netty5-user-guide/