概述

先了解一下 netty 大概框架图 ,可以看到客户端的创建和服务端最大的区别 - 服务端传入两个 EventLoopGroup,客户端传入一个 EventLoopGroup - channel 的类型也不同,服务端传入的是 NioServerSocketChannel ,客户端传入的是 NioSocketChannel - 服务端存在 childHandler 的设置,客户端没有,

客户端连接过程 : - 和服务端一样先创建 EventLoopGroup (只有一个,内部多个保持多个EventLoop线程,执行处理事务) - connect 方法 ,和服务端一样,使用group里的EventLoop创建一个 channel ,然后注册到 select 中去(这个过程都在channel 中进行) - (异步执行)当连接不上就会交给 EventLoop线程中执行监听的任务。 - 而一旦监听到了就交给 channel 执行。 - selectKey 可以attack一个object,刚好可以用来放channel ,然后在某个线程监听到某个实现的时候再把 channel 拿出来用

源码分析

实际上客户端只有一个 Reactor . 那么重点的逻辑就到了 connect 那里

    /**
     * Connect a {@link Channel} to the remote peer.
     */
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }
        validate();
        return doConnect(remoteAddress, localAddress);
    }

    /**
     * @see {@link #connect()}
     */
    private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    	//创建channel ,注册到 select , initAndRegister方法是父类的方法我们在分析服务端
    	//的时候已经分析过了
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        //一开始就连接上了,
        if (regFuture.cause() != null) {
            return regFuture;
        }

        final ChannelPromise promise = channel.newPromise();
        if (regFuture.isDone()) {
        	//链路成功后,异步连接 TCP
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }

        return promise;
    }


    private static void doConnect0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        // 到了执行连接的操作就转到了Netty 的 NIO线程执行,此刻客户端返回,连接异步执行。
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    if (localAddress == null) {
                    	//没传 localAddress 会传到 TailHandler的connect方法
                        channel.connect(remoteAddress, promise);
                    } else {
                    	//正常情况下到 HeaderHandler的connect 方法
                        channel.connect(remoteAddress, localAddress, promise);
                    }
                    promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

我们看一下 HeaderHandler的connect 方法。
       @Override
       public void connect(
               ChannelHandlerContext ctx,
               SocketAddress remoteAddress, SocketAddress localAddress,
               ChannelPromise promise) throws Exception {
           //执行 HeaderHandler内的unsafe字段的 connect 方法
           unsafe.connect(remoteAddress, localAddress, promise);
       }


unsafe会执行AbstractNioChannel(这个类是NioServerSocketChannel和NioSocketChannel的共同父类)的connect 方法

       @Override
       public void connect(
               final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
           if (!ensureOpen(promise)) {
               return;
           }

           try {
               if (connectPromise != null) {
                   throw new IllegalStateException("connection attempt already made");
               }

               boolean wasActive = isActive();
               //doConnect 方法是个抽象方法
               if (doConnect(remoteAddress, localAddress)) {
                   fulfillConnectPromise(promise, wasActive);
               } else {
                   connectPromise = promise;
                   requestedRemoteAddress = remoteAddress;

                   // Schedule connect timeout.
                   int connectTimeoutMillis = config().getConnectTimeoutMillis();
                   if (connectTimeoutMillis > 0) {
                       connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                           @Override
                           public void run() {
                               ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                               ConnectTimeoutException cause =
                                       new ConnectTimeoutException("connection timed out: " + remoteAddress);
                               if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                   close(voidPromise());
                               }
                           }
                       }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                   }

                   promise.addListener(new ChannelFutureListener() {
                       @Override
                       public void operationComplete(ChannelFuture future) throws Exception {
                           if (future.isCancelled()) {
                               if (connectTimeoutFuture != null) {
                                   connectTimeoutFuture.cancel(false);
                               }
                               connectPromise = null;
                               close(voidPromise());
                           }
                       }
                   });
               }
           } catch (Throwable t) {
               if (t instanceof ConnectException) {
                   Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
                   newT.setStackTrace(t.getStackTrace());
                   t = newT;
               }
               promise.tryFailure(t);
               closeIfClosed();
           }
       }

NioSocketChannel 的 doConnect 方法

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            javaChannel().socket().bind(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = javaChannel().connect(remoteAddress);
            if (!connected) {
            	//如果绑定不成功,注册连接事件
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

后续更新...

02-12 10:35