ChannelPipeline

ChannelHandler实例的列表,用于处理或者截获通道的接收和发送数据,让用户可以在ChannelPipeline中完全控制一个事件以及处理ChannelHandler和ChannelPipeline的交互。

每一个新的通道,都会创建一个新的ChannelPipeline并且附加到通道,永久性的耦合。

一个入站I/O事件,这个事件会从ChannelPipeline的第一个Handler开始一次通过,出站IO事件则从最后一个Handler开始依次通过。Handler可以处理时间并检查类型,不能处理则跳过,并将事件传递给下一个Handler。

我们找到,不能有其他IO-Thread的阻塞拉力影响整体的IO处理,比如JDBC。这时候可以通过一个EventExecutorGroup,自定义的事件会被包含在EventExecutorGroup的EventExecutor处理。

ChannelHandlerContext

通过context,ChannelHandler允许与其他ChannelHandler实现交互:

1、通知下一个ChannelHandler

2、想事件流全部通过ChannelPipeline,可以通过调用Channel和ChannelPipeline的方法:

Netty入门(3) - ChannelHandler-LMLPHP

3、想事件从ChannelPipeline的指定位置开始:

Netty入门(3) - ChannelHandler-LMLPHP

ChannelHandlerContext是线程安全的,可以在外部使用。

修改ChannelPipeline

调用ChannelHandlerContext的pipeline()方法能访问ChannelPipeline,可以运行时动态调整ChannelHandler。可以保持ChannelHandlerContext供以后使用,线程安全:

public class WriteHandler extends ChannelHandlerAdapter {

    private ChannelHandlerContext ctx;

    @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
} public void send(String msg) {
ctx.write(msg);
} }

如果ChannelHandler实例带有@Sharable注解则可以被添加到多个ChannelPipeline中,也就是说单个ChannelHandler可以有多个ChannelHandlerContext;如果没有添加该注解的Handler实例添加到多个Pipeline中则会抛异常:

@Sharable
public class NotSharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
count++;
System.out.println("channelRead(...) called the " + count + " time");
ctx.fireChannelRead(msg);
}
}

ChannelHandler及其子类

ChannelHandlerAdapter

ChannelInboundHandler / ChannelInboundHandlerAdapter 处理完消息之后不会自动释放,需要ReferenceCountUtil.release(msg);

ChannelOutboundHandler / ChannelOutboundHandlerAdapter

SimpleChannelInboundHandler<T> / SimpleChannelInboundHandler<T> 处理完消息之后自动释放

ChannelOutboundHandler所有重要方法采用ChannelPromise,如果ChannelPromise没有被通知,可能会导致其中一个ChannelFutureListener没有被通知去处理一个消息:

public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {

    @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ReferenceCountUtil.release(msg);
promise.setSuccess();
} }

总结一下:一般自定义消息,使用编码解码器实现字节传输,使用ChannelInboundHandlerAdapter/ChannelOutboundHandlerAdapter处理事件或者状态改变,使用SimpleChannelInboundHandler/SimpleChannelOutboundHandler处理消息。

05-11 09:37
查看更多