netty 实现

1. 各组件之间的关系

netty 原理-LMLPHP

每个ServerBootstrap与一个事件循环对象(一个线程)都会与一个Channel绑定,如NioServerSocketChannel

2. 如何绑定

在做bind操作时,会执行方法,register进行注册

ChannelFuture regFuture = config().group().register(channel);

关键接口及类之间的关系:

netty 原理-LMLPHP

EventLoopGroup与EventLoop及其类关系图之间形成组合模式。

@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);//选择一个线程执行器,调用register方法,绑定channel
}
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}

3. 代表客户端的channel创建

注意到ServerBootstrap有两个EventLoopGroup,parent 负责代表客户端channel的分发,child负责代码客户端channel的处理。

  • Accept事件,创建代表客户端的channel
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
} allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
} int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel()); try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t); try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
} return 0;
}

注意到doReadMessages会创建代表客户端的channel,创建完成会触发,fireChannelRead 事件。

4. 代表客户端的channel分发

上一节知道,channel创建完成会触发fireChannelRead 事件。在ServerSocketChannel初始化时,会注册ServerBootstrapAcceptor 用户接收代表客户端channel并分发到child EventLoopGroup中执行。

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
} ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
} try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

5. netty线程模型

netty 原理-LMLPHP

不管是客户端还是服务端可以通过一个Channel同时读或者写,不需要阻塞,读写多路复用。

6. handler链模型

netty 原理-LMLPHP

05-20 10:16