在使用Netty之前先介绍下Netty的常用API,对其有一个大概的了解。
一、EventLoop和EventLoopGroup
EventLoop如同它的名字,它是一个无限循环(Loop),在循环中不断处理接收到的事件(Event)。
Netty线程模型的基石是建立在EventLoop上的,从设计上来看,EventLoop采用了一种协同设计,它建立在两个基本的API之上:Concurrent和Channel,也就是并发和网络。并发是因为它采用了线程池来管理大量的任务,并且这些任务可以并发的执行。其继承了EventExecutor接口,而EventExecutor就是一个事件的执行器。另外为了与Channel的事件进行交互,EventLoop继承了EventLoopGroup接口。一个详细的EventLoop类继承层次结构如下:
一个Netty服务端启动时,通常会有两个NioEventLoopGroup:一个是监听线程组,主要是监听客户端请求,另一个是工作线程组,主要是处理与客户端的数据通讯。
Netty客户端只有一个NioEventLoopGroup,就是用来处理与服务端通信的线程组。
NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。NioEventLoopGroup的类图如下:
二、Bootstrap or ServerBootstrap
ServerBootStrap是Netty服务端启动配置类,BootStrap是Netty客户端启动配置类。
1. ServerBootstrap
- 绑定线程组,设置react模式的主线程池 以及 IO 操作线程池
group(bossGroup, workerGroup)
- channel(Class<? extends C> channelClass)
设置通讯模式,调用的是实现io.netty.channel.Channel接口的类。如:NioSocketChannel、NioServerSocketChannel,服务端一般可以选NioServerSocketChannel。
- option / handler / attr 方法
- option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;
- handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来处理Acceptor的操作;对于客户端的SocketChannel,主要是用来处理 业务操作;
- attr: 设置通道的属性;
option / handler / attr方法都定义在AbstractBootstrap中, 所以服务端和客户端的引导类方法调用都是调用的父类的对应方法。
- childHandler / childOption / childAttr 方法(只有服务端ServerBootstrap才有child类型的方法)
对于服务端而言,有两种通道需要处理, 一种是ServerSocketChannel:用于处理用户连接的accept操作, 另一种是SocketChannel,表示对应客户端连接。而对于客户端,一般都只有一种channel,也就是SocketChannel。
因此以child开头的方法,都定义在ServerBootstrap中,表示处理或配置服务端接收到的对应客户端连接的SocketChannel通道。
2. BootStrap
- 绑定线程组,设置IO操作线程池
group(workGroup)
- channel(Class<? extends C> channelClass)
设置通讯模式,调用的是实现io.netty.channel.Channel接口的类。如:NioSocketChannel、NioServerSocketChannel,客户端一般可以选NioSocketChannel。
3. ChannelOption
ChannelOption的各种属性在套接字选项中都有对应,下面简单的总结一下ChannelOption的含义已及使用的场景。
(1) ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。
(2) ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用;比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。
(3) ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
(4) ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。
(5) ChannelOption.SO_LINGER
ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送
(6) ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
三、ChannelInitializer
ChannelInitializer的类图:
ChannelInitializer继承于ChannelInboundHandler接口。
ChannelInitializer是一个抽象类,不能直接使用。
1. 抽象方法 initChannel
ChannelInitializer的实现类必须要重写这个方法,这个方法在Channel被注册到EventLoop的时候会被调用
2. ChannelInitializer的主要目的是为程序员提供了一个简单的工具,用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作。ChannelInitializer虽然会在一开始会被注册到Channel相关的pipeline里,但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作。
四、ChannelPipeline
ChannelPipeline = Channel + Pipeline,也就是说首先它与Channel绑定,然后它是起到类似于管道的作用:字节流在ChannelPipeline上流动,流动的过程中被ChannelHandler修饰,最终输出。
ChannelPipeline类图:
ChannelPipeline只有两个子类,直接一起放上来好了,其中EmbeddedChannelPipeline主要用于测试。
每个channel内部都会持有一个ChannelPipeline对象pipeline.
pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。
ChannelPipeline可以理解为ChannelHandler的容器,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。
五、ChannelHandler
ChannelHandler类似于Servlet的Filter过滤器,负责对I/O事件或者I/O操作进行拦截和处理,它可以选择性地拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递。基于ChannelHandler接口,用户可以方便地进行业务逻辑定制,例如打印日志、统一封装异常信息、性能统计和消息编解码等。
ChannelHandler支持注解,目前支持的注解有两种。
- @Sharable:多个ChannelPipeline共用同一个ChannelHandler;
- @Skip:被Skip注解的方法不会被调用,直接被忽略。
1. ChannelInboundHandlerAdapter
ChannelInboundHandlerAdapter是ChannelInboundHandler的一个简单实现,默认情况下不会做任何处理,只是简单的将操作通过fire*方法传递到ChannelPipeline中的下一个ChannelHandler中让链中的下一个ChannelHandler去处理。
需要注意的是信息经过channelRead方法处理之后不会自动释放(因为信息不会被自动释放所以能将消息传递给下一个ChannelHandler处理)。
(1) SimpleChannelInboundHandler
SimpleChannelInboundHandler支持泛型的消息处理,默认情况下消息处理完将会被自动释放,无法提供fire*方法传递给ChannelPipeline中的下一个ChannelHandler,如果想要传递给下一个ChannelHandler需要调用ReferenceCountUtil#retain方法。
一般用netty来发送和接收数据都会继承SimpleChannelInboundHandler和ChannelInboundHandlerAdapter这两个抽象类。
其实用这两个抽象类是有讲究的,在客户端的业务Handler继承的是SimpleChannelInboundHandler,而在服务器端继承的是ChannelInboundHandlerAdapter。
最主要的区别就是SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuffer资源(自动调用Bytebuffer.release())。而为何服务器端不能用呢,因为我们想让服务器把客户端请求的数据发送回去,而服务器端有可能在channelRead方法返回前还没有写完数据,因此不能让它自动release。
2. ChannelOutboundHandlerAdapter
表示出去的动作,监听自己的IO操作,比如connect,bind等,在重写这个Adapter的方法时,记得执行super.xxxx,否则动作无法执行。
Netty 中的事件分为inbound 事件和outbound 事件。inbound 事件通常由I/O线程触发,例如TCP 链路建立事件、链路关闭事件、读事件、异常通知事件等。Outbound 事件通常是I/O 用户主动发起的网络I/O 操作,例如用户发起的连接操作、绑定操作、消息发送等操作。
我们常用的inbound事件有:
- channelRegistered(ChannelHandlerContext) //channel注册事件
- channelActive(ChannelHandlerContext)//通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用
- exceptionCaught(ChannelHandlerContext, Throwable)//出错时会触发,做一些错误处理
- userEventTriggered(ChannelHandlerContext, Object)//用户自定义事件
- channelRead(ChannelHandlerContext, Object) //当收到对方发来的数据后,就会触发,参数msg就是发来的信息,可以是基础类型,也可以是序列化的复杂对象。
常用的outbound事件有:
- bind(ChannelHandlerContext ctx, SocketAddress localAddress,ChannelPromise promise) //服务端执行bind时,会进入到这里,我们可以在bind前及bind后做一些操作
- connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) //客户端执行connect连接服务端时进入
- write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)//发送事件
- flush(ChannelHandlerContext ctx) //刷新事件
ChannelPromise是ChannelFuture的扩展,允许设置I/O操作的结果,使ChannelFutureListener可以执行相关操作。
六、Channel
1) 通道状态主要包括:打开、关闭、连接
2) 通道主要的IO操作,读(read)、写(write)、连接(connect)、绑定(bind)。
3) 所有的IO操作都是异步的,调用诸如read,write方法后,并不保证IO操作完成,但会返回一个凭证,在IO操作成功,取消或失败后会记录在该凭证中。
4) channel有父子关系,SocketChannel是通过ServerSocketChannel接受创建的,故SocketChannel的parent()方法返回的就是ServerSocketChannel。
5) 在Channel使用完毕后,请调用close方法,释放通道占用的资源。
//返回全局唯一的channel id ChannelId id(); //返回该Channel注册的线程模型,先理解为Ractor模型的Ractor线程。 EventLoop eventLoop(); //返回该Channel由谁创建的,ServerSocketChannel返回null,SocketChannel返回创建它的ServerSocketChannel Channel parent(); //返回通道的配置信息 ChannelConfig config(); //通道是否打开 boolean isOpen(); //该通道是否已经注册在事件模型中,此处先参考Nio编程模型,一个通过需要注册在Register上 boolean isRegistered(); //通道是否激活 boolean isActive(); //通道是否支持 调用disconnect方法后,调用connect方法 ChannelMetadata metadata(); //返回绑定的地址,服务端的Channel返回监听的地址,而客户端的Channel返回连接到服务端的本地套接字。 SocketAddress localAddress(); //返回channel的远程套接字。 SocketAddress remoteAddress(); //通道的关闭凭证(许可),这里是多线程编程一种典型的设计模式,一个channle返回一个固定的 ChannelFuture closeFuture(); //是否可写,如果通道的写缓冲区未满,即返回true,表示写操作可以立即操作缓冲区,然后返回。 boolean isWritable(); Unsafe unsafe(); //返回管道 ChannelPipeline pipeline(); //返回ByteBuf内存分配器 ByteBufAllocator alloc(); //诸如newPromise,newSuccessedFuture()方法,就是返回一个凭证,用来保存通知结果的 ChannelPromise newPromise(); ChannelProgressivePromise newProgressivePromise(); ChannelFuture newSucceededFuture(); ChannelFuture newFailedFuture(Throwable cause); ChannelPromise voidPromise(); //绑定 ChannelFuture bind(SocketAddress localAddress); //连接 ChannelFuture connect(SocketAddress remoteAddress); ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress); //断开连接 ChannelFuture disconnect(); //关闭,释放通道资源 ChannelFuture close(); ChannelFuture deregister(); ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise); ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise); ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); ChannelFuture disconnect(ChannelPromise promise); ChannelFuture close(ChannelPromise promise); ChannelFuture deregister(ChannelPromise promise); Channel read(); ChannelFuture write(Object msg); ChannelFuture write(Object msg, ChannelPromise promise); Channel flush(); ChannelFuture writeAndFlush(Object msg, ChannelPromise promise); ChannelFuture writeAndFlush(Object msg); interface Unsafe { RecvByteBufAllocator.Handle recvBufAllocHandle(); ChannelHandlerInvoker invoker(); SocketAddress localAddress(); SocketAddress remoteAddress(); void register(EventLoop eventLoop, ChannelPromise promise); void bind(SocketAddress localAddress, ChannelPromise promise); void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); void disconnect(ChannelPromise promise); void close(ChannelPromise promise); void closeForcibly(); void deregister(ChannelPromise promise); void beginRead(); void write(Object msg, ChannelPromise promise); void flush(); ChannelPromise voidPromise(); //返回通道的环形缓存区 ChannelOutboundBuffer outboundBuffer(); }
Channel类图:
七、Future or ChannelFuture
Netty的Future接口继承了JDK的Future
接口,同时提供了更多的方法:
public interface Future<V> extends java.util.concurrent.Future<V> { boolean isSuccess(); Throwable cause(); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); Future<V> sync() throws InterruptedException; Future<V> await() throws InterruptedException; V getNow(); }
任务成功完成后isSuccess()返回true
任务执行过程中有异常,cause()会返回异常对象
任务被取消执行,父接口方法isCancelled返回true
以上3种情况isDone()均为true:
//任务完成 if (task.isDone()) { if (task.isSuccess()) { // 成功 } else if (task.isCancelled()) { // 被取消 } else { // 异常 System.out.print(task.cause()) } }
wait和sync都会阻塞,并等待任务完成
getNow()不会阻塞,会立即返回,但任务尚未执行完成时,会返回null
addListener方法在当前Future对象中添加监听器,当任务完成时,会通知所有的监听器。
1. ChannelFuture
ChannelFuture继承了Netty的Future接口,代表 Netty channel的I/O操作的执行结果。在Netty中所有的I/O操作都是异步的,会立即返回一个代表I/O操作的结果,即ChannelFuture。
在获得执行结果时,推荐使用添加监听器,监听执行完成事件operaionCompleted,而不要使用await方法。在ChannelHandler中调用await,会造成死锁。因为ChannelHandler中的方法通常是I/O线程调用的,再调用await会造成I/O阻塞。
//错误 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ChannelFuture future = ctx.channel().close(); future.awaitUninterruptibly(); // Perform post-closure operation // ... } // 正确 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ChannelFuture future = ctx.channel().close(); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { // Perform post-closure operation // ... } }); }
即使是通过添加ChannelFutureListener的方式获取执行结果,但要注意的是:回调方法operationComplete也是由I/O线程调用的,所以也不能在其中执行耗时任务。如必须,则启用线程池执行。
ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerInitializer()) .bind(8899) .sync();
bind方法是异步的,其返回值是ChannelFuture类型。需要调用sync()同步方法,等待绑定动作执行完成。
八、其他
1. ByteToMessageDecoder
2. MessageToMessageDecoder
3. LengthFieldBasedFrameDecoder
4. MessageToByteEncoder
5. MessageToMessageEncoder
6. LengthFieldPrepender