不学无数的程序员

不学无数的程序员

NIO中和选择器Selector

在上一篇的JAVA中NIO再深入我们学会了如何使用Buffer ,而在Java中IO和NIO中我们略微了解到Channel的概念,我们知道了Channel就像矿洞里的铁轨一样,Buffer就像铁轨上的矿车,对于数据真正的操作都是对于Buffer的操作。而在NIO中还有一个非常重要的概念就是Selector,它就像矿洞里的调度系统一样。

为什么要有Selector

要理解为什么要有Selector?这个问题,我们首先得知道在UNIX系统中有五种I/O模型:同步阻塞I/O、同步非阻塞I/O、I/O多路复用、信号驱动I/O和异步I/O。这个几个I/O模型都是什么意思呢,大概比喻一下。

  • 阻塞式I/O模型:一个人在钓鱼,当没鱼上钩时,就坐在岸边一直等。
  • 非阻塞式I/O模型:边钓鱼边玩手机,隔会再看看有没有鱼上钩,有的话就迅速拉杆。
  • I/O复用模型:放了一堆鱼竿,在岸边一直守着这堆鱼竿,没鱼上钩就玩手机。
  • 信号驱动式I/O模型:鱼竿上系了个铃铛,当铃铛响,就知道鱼上钩,然后可以专心玩手机。
  • 异步I/O模型:雇佣一个人来给我钓鱼,钓上来以后给我送到住处,我该干嘛干嘛去。

而所谓的I/O就是计算机内存与外部设备之间数据拷贝的过程,我们知道CPU访问内存的速度远远高于外部设备,因此CPU通常就是先将外部设备的数据读取到内存中,然后再进行处理。然后此时有个场景,当那你的用户程序通过CPU向外部设备发送了一个读的指令,数据从外部设备到内存中是需要一段时间的,那么此时CPU是休息呢?还是让给别人?还是不断的询问,到了吗?到了吗?到了吗……?这个就是I/O模型所要解决的问题。

而我们的NIO模拟的I/O模型就是I/O复用模型。通过只阻塞Selector 这一个线程,通过Selector 不断的查询Channel中的状态,从而达到了一个线程控制Selector ,而一个Selector 控制多个Channel的目的。用图表示就是这样。

Selector使用

从图上面我们就可以猜出来大概的Selector 该如何来使用

创建Selector

通过调用Selector.open()方法来创建一个Selector。

Selector selector = Selector.open();

创建所需要的Channel

我们知道NIO中的Channel分为四种类型

  • FileChannel:文件通道
  • DatagramChannel:通过UDP读取网络中的数据
  • SocketChannel:通过TCP读取网络中的数据
  • ServerSocketChannel:可以监听进来的连接,对于每个进来的连接都会创建一个SocketChannel

在这四个通道中有一个不能和Selector 配合使用,因为从图中可以看出,我们的Selector 是不断的轮询注册在Selector 中的每个通道的状态,不能阻塞在其中一个通道,即每个通道必须是非阻塞状态的,但是FileChannel的通道是阻塞状态且不能更改,所以FileChannel不能和Selector 配合使用。

ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.socket().bind(new InetSocketAddress(8080));
//设置为非阻塞模式
socketChannel.configureBlocking(false);

将创建好的Channel注册到Selector上

为了便于Selector管理Channel,我们将Channel注册到Selector上。

//将Channel注册到Selector上
SelectionKey selectionKey = socketChannel.register(selector,SelectionKey.OP_READ);

我们可以看到第一个参数就是我们自己的Selector,而第二个参数就是选择要监听的事件类型,一共有四种

  • SelectionKey.OP_CONNECT:连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
  • SelectionKey.OP_ACCEPT:连接就绪事件,服务端收到客户端的一个连接请求会触发
  • SelectionKey.OP_READ:读就绪事件,表示通道中已经有可读的数据了,可以执行读操作
  • SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了

Selector循环遍历各个Channel

在上一步我们已经将所需要的Channel注册到了Selector 中,那么我们现在可以调用Selector.select()方法进行遍历得到已经准备好的Channel


Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
}

完整的例子

做一个简单的服务器监听的程序。监听本机的8080端口,打印出发送过来的数据。


public class TestNIO {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        socketChannel.socket().bind(new InetSocketAddress(8080));
        //设置为非阻塞模式
        socketChannel.configureBlocking(false);
        //将Channel注册到Selector上
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true){
            int readyChannel = selector.select();
            if (readyChannel == 0){
                continue;
            }
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
            while (keyIterator.hasNext()){
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                if (key.isAcceptable()){
                    System.out.println("isAcceptable");
                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                    clientChannel.configureBlocking(false);
                    clientChannel.register(key.selector(),SelectionKey.OP_READ);
                }
                else if (key.isConnectable()){
                    System.out.println("isConnectable");
                }
                else if (key.isReadable()){
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    clientChannel.read(byteBuffer);
                    System.out.println(new String(byteBuffer.array()));
                }else if (key.isWritable()){
                    System.out.println("isWritable");
                }
            }
        }
    }
}

此时可以通过在控制台用命令telnet localhost 8080即可与服务器连接。

参考文章

05-27 12:01