前言

反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:

  1. Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器。
  2. Handlers处理器的职责:非阻塞的执行业务处理逻辑。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等

1. 单线程反应器

IO多路复用使用了NIO模型。
Reactor模式使用了IO多路复用的机制。

/反应器
class EchoReactor implements Runnable {
    Selector selector;
    ServerSocketChannel serverSocket;
    EchoReactor() throws IOException {
        //...获取选择器、开启serverSocket服务监听通道
        //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress("127.0.0.1",8080));
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步处理.第一步,注册serverSocket(监听)的accept事件
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        //将新连接处理器作为附件,绑定到sk选择键
        // 绑定AcceptorHandler新连接处理器到selectKey;类似对象的set方法
        sk.attach(new AcceptorHandler());
    }
    //轮询和分发事件
    public void run() {
        try {
            while (! Thread.interrupted()) {
                // 返回注册在selector中等待IO操作(及有事件发生)channel的selectionKey
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    //反应器负责dispatch收到的事件
                    SelectionKey sk = it.next();
                    //分发
                    dispatch(sk);
                }
                selected.clear();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    void dispatch(SelectionKey sk) {
        Runnable handler = (Runnable) sk.attachment();
        //调用之前attach绑定到选择键的handler处理器对象
        if (handler!= null) {
            handler.run();
        }
    }

    // Handler:新连接处理器
    class AcceptorHandler implements Runnable {
        public void run() {
            try {
                //处理连接
                SocketChannel channel = serverSocket.accept();
                if (channel!= null)
                // 构造方法中绑定。处理真实的逻辑
                new EchoHandler(selector, channel);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new Thread(new EchoReactor()).start();
    }
}
class EchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;

    EchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        //取得选择键,再设置感兴趣的IO事件
        //将新的SocketChannel传输通道,注册到了反应器Reactor类的同一个选择器中。
        // 这样保证了Reactor类和Handler类在同一个线程中执行
        sk = channel.register(selector, 0);
        //将Handler自身作为选择键的附件
        //Channel传输通道注册完成后,将EchoHandler自身作为附件,attach到了选择键中。
        // 这样,在Reactor类分发事件(选择键)时,能执行到EchoHandler的run方法
        sk.attach(this);
        //注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    public void run() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);
                //写完后,准备开始从通道读,byteBuffer切换成写入模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                    System.out.println(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读取模式
                byteBuffer.flip();
                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了,这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}
1.1 单线程反应器问题

当其中某个Handler阻塞时,会导致其他所有的Handler都得不到执行。在这种场景下,如果被阻塞的Handler不仅仅负责输入和输出处理的业务,还包括负责连接监听的AcceptorHandler处理器。这个是非常严重的问题

2 多线程反应器

  1. 将负责业务处理的线程单独放入线程池中,即将业务处理的线程和连接监听、IO查询的线程隔离
  2. 如何服务器为多核CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程,即工作线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器(类似多个单线程的多路复用)。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力
//多线程反应器
class MultiThreadEchoServerReactor {
    ServerSocketChannel serverSocket;
    AtomicInteger next = new AtomicInteger(0);

    Selector bossSelector = null;
    //selectors集合,引入多个selector选择器
    Selector[] workSelectors = new Selector[2];


    Reactor bossReactor = null;
    //引入多个子反应器
    Reactor[] workReactors = null;


    MultiThreadEchoServerReactor() throws IOException {

        //初始化多个selector选择器
        bossSelector = Selector.open();// 用于监听新连接事件
        workSelectors[0] = Selector.open(); // 用于监听read、write事件
        workSelectors[1] = Selector.open(); // 用于监听read、write事件

        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocket.socket().bind(address);
        serverSocket.configureBlocking(false);//非阻塞


        //bossSelector,负责监控新连接事件, 将 serverSocket注册到bossSelector
        SelectionKey sk = serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);
        //绑定Handler:新连接监控handler绑定到SelectionKey(选择键)
        sk.attach(new AcceptorHandler());


        //bossReactor反应器,处理新连接的bossSelector
        bossReactor = new Reactor(bossSelector);
        //第一个子反应器,一子反应器负责一个worker选择器
        Reactor workReactor1 = new Reactor(workSelectors[0]);
        //第二个子反应器,一子反应器负责一个worker选择器
        Reactor workReactor2 = new Reactor(workSelectors[1]);
        workReactors = new Reactor[]{workReactor1, workReactor2};
    }

    private void startService() {
        // 一子反应器对应一条线程
        new Thread(bossReactor).start(); //只负责连接的监听
        new Thread(workReactors[0]).start();
        new Thread(workReactors[1]).start();
    }

    //反应器
    //连接|工作
    class Reactor implements Runnable {
        //每条线程负责一个选择器的查询
        final Selector selector;

        public Reactor(Selector selector) {
            this.selector = selector;
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    //单位为毫秒
                    selector.select(1000);
                    //连接和工作有不同的选择器
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    if (null == selectedKeys || selectedKeys.size() == 0) {
                        continue;
                    }
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        //Reactor负责dispatch收到的事件
                        SelectionKey sk = it.next();
                        //不同的选择器,执行附件里面不同的逻辑
                        dispatch(sk);
                    }
                    selectedKeys.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }


        void dispatch(SelectionKey sk) {
            //取出附件,接下来去执行
            Runnable handler = (Runnable) sk.attachment();
            //调用之前attach绑定到选择键的handler处理器对象
            //执行逻辑
            if (handler != null) {
                handler.run();
            }
        }
    }


    // Handler:新连接处理器
    class AcceptorHandler implements Runnable {
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                Logger.info("接收到一个新的连接");

                //连接成功了,可能有数据过来,放入工作选择器,来监听
                if (channel != null) {
                    int index = next.get();
                    Logger.info("选择器的编号:" + index);
                    Selector selector = workSelectors[index];
                    //连接的通道 放到选择器中
                    new MultiThreadEchoHandler(selector, channel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (next.incrementAndGet() == workSelectors.length) {
                next.set(0);
            }
        }
    }


    public static void main(String[] args) throws IOException {
        MultiThreadEchoServerReactor server =
                new MultiThreadEchoServerReactor();
        server.startService();
    }

}
class MultiThreadEchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    //引入线程池
    static ExecutorService pool = Executors.newFixedThreadPool(4);

    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        channel.configureBlocking(false);
        channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
        //仅仅取得选择键,后设置感兴趣的IO事件
        sk = channel.register(selector, 0);
        //将本Handler作为sk选择键的附件,方便事件dispatch
        sk.attach(this);
        //向sk选择键注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);
        //唤醒 查询线程,使得OP_READ生效
        selector.wakeup();
        Logger.info("新的连接 注册完成");

    }

    public void run() {
        //异步任务,在独立的线程池中执行
        //提交数据传输任务到线程池
        //使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
        pool.execute(new AsyncTask());
    }

    //异步任务,不在Reactor线程中执行
    //数据传输与业务处理任务,不在IO事件轮询线程中执行,在独立的线程池中执行
    public synchronized void asyncRun() {
        try {
            if (state == SENDING) {
                //写入通道
                channel.write(byteBuffer);

                //写完后,准备开始从通道读,byteBuffer切换成写模式
                byteBuffer.clear();
                //写完后,注册read就绪事件
                sk.interestOps(SelectionKey.OP_READ);
                //写完后,进入接收的状态
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //从通道读
                int length = 0;
                while ((length = channel.read(byteBuffer)) > 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //读完后,准备开始写入通道,byteBuffer切换成读模式
                byteBuffer.flip();
                //读完后,注册write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //读完后,进入发送的状态
                state = SENDING;
            }
            //处理结束了, 这里不能关闭select key,需要重复使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    //异步任务的内部类
    class AsyncTask implements Runnable {
        public void run() {
            MultiThreadEchoHandler.this.asyncRun();
        }
    }

}

注意
使得IO处理不在IO事件轮询线程中执行,在独立的线程池中执行
pool.execute(new AsyncTask());。工作线程也是有限的,就几个,是非阻塞的,真正的去完成数据传输这种操作,还是新建线程去完成,不然整个服务不就变成几个工作线程了,工作线程来查询连接是否有数据需要处理,然后用线程池去处理。

引用:《Java高并发核心编程(卷1)》一书。做了更多的注释

09-22 14:09