在使用Netty之前先介绍下Netty的常用API,对其有一个大概的了解。

一、EventLoop和EventLoopGroup

EventLoop如同它的名字,它是一个无限循环(Loop),在循环中不断处理接收到的事件(Event)。

Netty线程模型的基石是建立在EventLoop上的,从设计上来看,EventLoop采用了一种协同设计,它建立在两个基本的API之上:Concurrent和Channel,也就是并发和网络。并发是因为它采用了线程池来管理大量的任务,并且这些任务可以并发的执行。其继承了EventExecutor接口,而EventExecutor就是一个事件的执行器。另外为了与Channel的事件进行交互,EventLoop继承了EventLoopGroup接口。一个详细的EventLoop类继承层次结构如下:

Netty的常用API(二)-LMLPHP

一个Netty服务端启动时,通常会有两个NioEventLoopGroup:一个是监听线程组,主要是监听客户端请求,另一个是工作线程组,主要是处理与客户端的数据通讯。

Netty客户端只有一个NioEventLoopGroup,就是用来处理与服务端通信的线程组。

NioEventLoopGroup可以理解为一个线程池,内部维护了一组线程,每个线程负责处理多个Channel上的事件,而一个Channel只对应于一个线程,这样可以回避多线程下的数据同步问题。NioEventLoopGroup的类图如下:

Netty的常用API(二)-LMLPHP

二、Bootstrap or ServerBootstrap

Netty的常用API(二)-LMLPHP

ServerBootStrap是Netty服务端启动配置类,BootStrap是Netty客户端启动配置类。

1. ServerBootstrap

  • 绑定线程组,设置react模式的主线程池 以及 IO 操作线程池

group(bossGroup, workerGroup)

  • channel(Class<? extends C> channelClass)

设置通讯模式,调用的是实现io.netty.channel.Channel接口的类。如:NioSocketChannel、NioServerSocketChannel,服务端一般可以选NioServerSocketChannel。

  • option / handler / attr 方法

    • option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;
    • handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来处理Acceptor的操作;对于客户端的SocketChannel,主要是用来处理 业务操作;
    • attr: 设置通道的属性;

option / handler / attr方法都定义在AbstractBootstrap中, 所以服务端和客户端的引导类方法调用都是调用的父类的对应方法。

  • childHandler / childOption / childAttr 方法(只有服务端ServerBootstrap才有child类型的方法)

对于服务端而言,有两种通道需要处理, 一种是ServerSocketChannel:用于处理用户连接的accept操作, 另一种是SocketChannel,表示对应客户端连接。而对于客户端,一般都只有一种channel,也就是SocketChannel。

因此以child开头的方法,都定义在ServerBootstrap中,表示处理或配置服务端接收到的对应客户端连接的SocketChannel通道。

2. BootStrap

  • 绑定线程组,设置IO操作线程池

group(workGroup)

  • channel(Class<? extends C> channelClass)

设置通讯模式,调用的是实现io.netty.channel.Channel接口的类。如:NioSocketChannel、NioServerSocketChannel,客户端一般可以选NioSocketChannel。

3. ChannelOption

ChannelOption的各种属性在套接字选项中都有对应,下面简单的总结一下ChannelOption的含义已及使用的场景。

(1) ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。

(2) ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用;比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。

(3) ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

(4) ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

(5) ChannelOption.SO_LINGER

ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送

(6) ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

三、ChannelInitializer

ChannelInitializer的类图:

Netty的常用API(二)-LMLPHP

ChannelInitializer继承于ChannelInboundHandler接口。

ChannelInitializer是一个抽象类,不能直接使用。

1. 抽象方法 initChannel

ChannelInitializer的实现类必须要重写这个方法,这个方法在Channel被注册到EventLoop的时候会被调用

2. ChannelInitializer的主要目的是为程序员提供了一个简单的工具,用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作。ChannelInitializer虽然会在一开始会被注册到Channel相关的pipeline里,但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作。

四、ChannelPipeline

ChannelPipeline = Channel + Pipeline,也就是说首先它与Channel绑定,然后它是起到类似于管道的作用:字节流在ChannelPipeline上流动,流动的过程中被ChannelHandler修饰,最终输出。

ChannelPipeline类图:

Netty的常用API(二)-LMLPHP

ChannelPipeline只有两个子类,直接一起放上来好了,其中EmbeddedChannelPipeline主要用于测试。

每个channel内部都会持有一个ChannelPipeline对象pipeline.
pipeline默认实现DefaultChannelPipeline内部维护了一个DefaultChannelHandlerContext链表。

Netty的常用API(二)-LMLPHP

ChannelPipeline可以理解为ChannelHandler的容器,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。

五、ChannelHandler

ChannelHandler类似于Servlet的Filter过滤器,负责对I/O事件或者I/O操作进行拦截和处理,它可以选择性地拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递。基于ChannelHandler接口,用户可以方便地进行业务逻辑定制,例如打印日志、统一封装异常信息、性能统计和消息编解码等。

ChannelHandler支持注解,目前支持的注解有两种。

  • @Sharable:多个ChannelPipeline共用同一个ChannelHandler;
  • @Skip:被Skip注解的方法不会被调用,直接被忽略。

1. ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter是ChannelInboundHandler的一个简单实现,默认情况下不会做任何处理,只是简单的将操作通过fire*方法传递到ChannelPipeline中的下一个ChannelHandler中让链中的下一个ChannelHandler去处理。

需要注意的是信息经过channelRead方法处理之后不会自动释放(因为信息不会被自动释放所以能将消息传递给下一个ChannelHandler处理)。

(1) SimpleChannelInboundHandler

SimpleChannelInboundHandler支持泛型的消息处理,默认情况下消息处理完将会被自动释放,无法提供fire*方法传递给ChannelPipeline中的下一个ChannelHandler,如果想要传递给下一个ChannelHandler需要调用ReferenceCountUtil#retain方法。

一般用netty来发送和接收数据都会继承SimpleChannelInboundHandler和ChannelInboundHandlerAdapter这两个抽象类。

其实用这两个抽象类是有讲究的,在客户端的业务Handler继承的是SimpleChannelInboundHandler,而在服务器端继承的是ChannelInboundHandlerAdapter。

最主要的区别就是SimpleChannelInboundHandler在接收到数据后会自动release掉数据占用的Bytebuffer资源(自动调用Bytebuffer.release())。而为何服务器端不能用呢,因为我们想让服务器把客户端请求的数据发送回去,而服务器端有可能在channelRead方法返回前还没有写完数据,因此不能让它自动release。

2. ChannelOutboundHandlerAdapter

表示出去的动作,监听自己的IO操作,比如connect,bind等,在重写这个Adapter的方法时,记得执行super.xxxx,否则动作无法执行。

Netty 中的事件分为inbound 事件和outbound 事件。inbound 事件通常由I/O线程触发,例如TCP 链路建立事件、链路关闭事件、读事件、异常通知事件等。Outbound 事件通常是I/O 用户主动发起的网络I/O 操作,例如用户发起的连接操作、绑定操作、消息发送等操作。

我们常用的inbound事件有:

  • channelRegistered(ChannelHandlerContext) //channel注册事件
  • channelActive(ChannelHandlerContext)//通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用
  • exceptionCaught(ChannelHandlerContext, Throwable)//出错时会触发,做一些错误处理
  • userEventTriggered(ChannelHandlerContext, Object)//用户自定义事件
  • channelRead(ChannelHandlerContext, Object) //当收到对方发来的数据后,就会触发,参数msg就是发来的信息,可以是基础类型,也可以是序列化的复杂对象。

常用的outbound事件有:

  • bind(ChannelHandlerContext ctx, SocketAddress localAddress,ChannelPromise promise) //服务端执行bind时,会进入到这里,我们可以在bind前及bind后做一些操作
  • connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) //客户端执行connect连接服务端时进入
  • write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)//发送事件
  • flush(ChannelHandlerContext ctx) //刷新事件

ChannelPromise是ChannelFuture的扩展,允许设置I/O操作的结果,使ChannelFutureListener可以执行相关操作。

六、Channel

1) 通道状态主要包括:打开、关闭、连接
2) 通道主要的IO操作,读(read)、写(write)、连接(connect)、绑定(bind)。
3) 所有的IO操作都是异步的,调用诸如read,write方法后,并不保证IO操作完成,但会返回一个凭证,在IO操作成功,取消或失败后会记录在该凭证中。
4) channel有父子关系,SocketChannel是通过ServerSocketChannel接受创建的,故SocketChannel的parent()方法返回的就是ServerSocketChannel。
5) 在Channel使用完毕后,请调用close方法,释放通道占用的资源。

    //返回全局唯一的channel id
    ChannelId id();
    //返回该Channel注册的线程模型,先理解为Ractor模型的Ractor线程。
    EventLoop eventLoop();
    //返回该Channel由谁创建的,ServerSocketChannel返回null,SocketChannel返回创建它的ServerSocketChannel
    Channel parent();
    //返回通道的配置信息
    ChannelConfig config();
    //通道是否打开
    boolean isOpen();
    //该通道是否已经注册在事件模型中,此处先参考Nio编程模型,一个通过需要注册在Register上
    boolean isRegistered();
    //通道是否激活
    boolean isActive();
    //通道是否支持 调用disconnect方法后,调用connect方法
    ChannelMetadata metadata();
    //返回绑定的地址,服务端的Channel返回监听的地址,而客户端的Channel返回连接到服务端的本地套接字。
    SocketAddress localAddress();
    //返回channel的远程套接字。
    SocketAddress remoteAddress();
    //通道的关闭凭证(许可),这里是多线程编程一种典型的设计模式,一个channle返回一个固定的
    ChannelFuture closeFuture();
    //是否可写,如果通道的写缓冲区未满,即返回true,表示写操作可以立即操作缓冲区,然后返回。
    boolean isWritable();
    Unsafe unsafe();
    //返回管道
    ChannelPipeline pipeline();
    //返回ByteBuf内存分配器
    ByteBufAllocator alloc();
    //诸如newPromise,newSuccessedFuture()方法,就是返回一个凭证,用来保存通知结果的
    ChannelPromise newPromise();
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable cause);
    ChannelPromise voidPromise();
    //绑定
    ChannelFuture bind(SocketAddress localAddress);
    //连接
    ChannelFuture connect(SocketAddress remoteAddress);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
    //断开连接
    ChannelFuture disconnect();
    //关闭,释放通道资源
    ChannelFuture close();
    ChannelFuture deregister();
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
    ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    ChannelFuture disconnect(ChannelPromise promise);
    ChannelFuture close(ChannelPromise promise);
    ChannelFuture deregister(ChannelPromise promise);
    Channel read();
    ChannelFuture write(Object msg);
    ChannelFuture write(Object msg, ChannelPromise promise);
    Channel flush();
    ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
    ChannelFuture writeAndFlush(Object msg);
    interface Unsafe {
        RecvByteBufAllocator.Handle recvBufAllocHandle();
        ChannelHandlerInvoker invoker();
        SocketAddress localAddress();
        SocketAddress remoteAddress();
        void register(EventLoop eventLoop, ChannelPromise promise);
        void bind(SocketAddress localAddress, ChannelPromise promise);
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        void disconnect(ChannelPromise promise);
        void close(ChannelPromise promise);
        void closeForcibly();
        void deregister(ChannelPromise promise);
        void beginRead();
        void write(Object msg, ChannelPromise promise);
        void flush();
        ChannelPromise voidPromise();
        //返回通道的环形缓存区
        ChannelOutboundBuffer outboundBuffer();
    }

Channel类图:

Netty的常用API(二)-LMLPHP

七、Future or ChannelFuture

Netty的Future接口继承了JDK的Future接口,同时提供了更多的方法:

public interface Future<V> extends java.util.concurrent.Future<V> {
    boolean isSuccess();
    Throwable cause();
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
    Future<V> sync() throws InterruptedException;
    Future<V> await() throws InterruptedException;
    V getNow();
}

任务成功完成后isSuccess()返回true
任务执行过程中有异常,cause()会返回异常对象
任务被取消执行,父接口方法isCancelled返回true
以上3种情况isDone()均为true:

//任务完成
 if (task.isDone()) {
    if (task.isSuccess()) {
        // 成功
    } else if (task.isCancelled()) {
        // 被取消
    } else {
        // 异常
        System.out.print(task.cause())
    }
 }

wait和sync都会阻塞,并等待任务完成
getNow()不会阻塞,会立即返回,但任务尚未执行完成时,会返回null
addListener方法在当前Future对象中添加监听器,当任务完成时,会通知所有的监听器。

1. ChannelFuture

ChannelFuture继承了Netty的Future接口,代表 Netty channel的I/O操作的执行结果。在Netty中所有的I/O操作都是异步的,会立即返回一个代表I/O操作的结果,即ChannelFuture。

在获得执行结果时,推荐使用添加监听器,监听执行完成事件operaionCompleted,而不要使用await方法。在ChannelHandler中调用await,会造成死锁。因为ChannelHandler中的方法通常是I/O线程调用的,再调用await会造成I/O阻塞。

 //错误
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ChannelFuture future = ctx.channel().close();
   future.awaitUninterruptibly();
   // Perform post-closure operation
   // ...
 }

 // 正确
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {
   ChannelFuture future = ctx.channel().close();
   future.addListener(new ChannelFutureListener() {
       public void operationComplete(ChannelFuture future) {
           // Perform post-closure operation
           // ...
       }
   });
 }

即使是通过添加ChannelFutureListener的方式获取执行结果,但要注意的是:回调方法operationComplete也是由I/O线程调用的,所以也不能在其中执行耗时任务。如必须,则启用线程池执行。

ChannelFuture channelFuture = serverBootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerInitializer())
                .bind(8899)
                .sync();

bind方法是异步的,其返回值是ChannelFuture类型。需要调用sync()同步方法,等待绑定动作执行完成。

八、其他

1. ByteToMessageDecoder

2. MessageToMessageDecoder

3. LengthFieldBasedFrameDecoder

4. MessageToByteEncoder

5. MessageToMessageEncoder

6. LengthFieldPrepender

05-06 04:26