概述
先了解一下 netty 大概框架图 ,可以看到客户端的创建和服务端最大的区别 - 服务端传入两个 EventLoopGroup,客户端传入一个 EventLoopGroup - channel 的类型也不同,服务端传入的是 NioServerSocketChannel ,客户端传入的是 NioSocketChannel - 服务端存在 childHandler 的设置,客户端没有,
客户端连接过程 : - 和服务端一样先创建 EventLoopGroup (只有一个,内部多个保持多个EventLoop线程,执行处理事务) - connect 方法 ,和服务端一样,使用group里的EventLoop创建一个 channel ,然后注册到 select 中去(这个过程都在channel 中进行) - (异步执行)当连接不上就会交给 EventLoop线程中执行监听的任务。 - 而一旦监听到了就交给 channel 执行。 - selectKey 可以attack一个object,刚好可以用来放channel ,然后在某个线程监听到某个实现的时候再把 channel 拿出来用
源码分析
实际上客户端只有一个 Reactor . 那么重点的逻辑就到了 connect 那里
/** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doConnect(remoteAddress, localAddress); } /** * @see {@link #connect()} */ private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { //创建channel ,注册到 select , initAndRegister方法是父类的方法我们在分析服务端 //的时候已经分析过了 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); //一开始就连接上了, if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise = channel.newPromise(); if (regFuture.isDone()) { //链路成功后,异步连接 TCP doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } else { regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doConnect0(regFuture, channel, remoteAddress, localAddress, promise); } }); } return promise; } private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. // 到了执行连接的操作就转到了Netty 的 NIO线程执行,此刻客户端返回,连接异步执行。 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { if (localAddress == null) { //没传 localAddress 会传到 TailHandler的connect方法 channel.connect(remoteAddress, promise); } else { //正常情况下到 HeaderHandler的connect 方法 channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
我们看一下 HeaderHandler的connect 方法。
@Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { //执行 HeaderHandler内的unsafe字段的 connect 方法 unsafe.connect(remoteAddress, localAddress, promise); }
unsafe会执行AbstractNioChannel(这个类是NioServerSocketChannel和NioSocketChannel的共同父类)的connect 方法
@Override public void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } try { if (connectPromise != null) { throw new IllegalStateException("connection attempt already made"); } boolean wasActive = isActive(); //doConnect 方法是个抽象方法 if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { if (t instanceof ConnectException) { Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress); newT.setStackTrace(t.getStackTrace()); t = newT; } promise.tryFailure(t); closeIfClosed(); } }
NioSocketChannel 的 doConnect 方法
@Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { javaChannel().socket().bind(localAddress); } boolean success = false; try { boolean connected = javaChannel().connect(remoteAddress); if (!connected) { //如果绑定不成功,注册连接事件 selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
后续更新...